diff --git a/containers_cpp/include/embb/containers/internal/llx_scx-inl.h b/containers_cpp/include/embb/containers/internal/llx_scx-inl.h index eb6d1a8..a867344 100644 --- a/containers_cpp/include/embb/containers/internal/llx_scx-inl.h +++ b/containers_cpp/include/embb/containers/internal/llx_scx-inl.h @@ -49,8 +49,19 @@ template< typename UserData, typename ValuePool > LlxScx::LlxScx(size_t max_links) : max_links_(max_links), max_threads_(embb::base::Thread::GetThreadsMaxCount()), - scx_record_list_pool_(max_threads_), - scx_record_pool_(max_threads_) { + scx_record_list_pool_(max_threads_ * max_threads_ * 2), + scx_record_pool_(max_threads_ * max_threads_ * 2), + // Disable "this is used in base member initializer" warning. + // We explicitly want this. +#ifdef EMBB_PLATFORM_COMPILER_MSVC +#pragma warning(push) +#pragma warning(disable:4355) +#endif + delete_operation_callback(*this, &self_t::DeleteOperationCallback), +#ifdef EMBB_PLATFORM_COMPILER_MSVC +#pragma warning(pop) +#endif + hp(delete_operation_callback, NULL, 2) { typedef embb::containers::internal::FixedSizeList llx_result_list_t; typedef embb::containers::internal::FixedSizeList @@ -86,9 +97,9 @@ bool LlxScx::TryLoadLinked( finalized = false; unsigned int thread_id = ThreadId(); // Order of initialization matters: - bool marked_1 = data_record->IsMarkedForFinalize(); - ScxRecord_t * curr_scx = data_record->ScxInfo().Load(); - OperationState curr_state = curr_scx->State(); + volatile bool marked_1 = data_record->IsMarkedForFinalize(); + ScxRecord_t * curr_scx = data_record->ScxInfo().Load(); + volatile OperationState curr_state = curr_scx->State(); bool marked_2 = data_record->IsMarkedForFinalize(); if (curr_state == OperationState::Aborted || (curr_state == OperationState::Comitted && !marked_2)) { @@ -101,7 +112,7 @@ bool LlxScx::TryLoadLinked( llx_result.data_record = data_record; llx_result.scx_record = curr_scx; llx_result.user_data = user_data_local; - thread_llx_results_[thread_id]->PushBack(llx_result); + assert(thread_llx_results_[thread_id]->PushBack(llx_result)); // Set return value: user_data = user_data_local; return true; @@ -122,6 +133,13 @@ bool LlxScx::TryLoadLinked( } template< typename UserData, typename ValuePool > +void LlxScx::ClearLinks() { + // Clear thread-local list of LLX results + unsigned int thread_id = ThreadId(); + thread_llx_results_[thread_id]->clear(); +} + +template< typename UserData, typename ValuePool > bool LlxScx::TryStoreConditional( embb::base::Atomic * cas_field, cas_t cas_value, @@ -160,15 +178,15 @@ bool LlxScx::TryStoreConditional( llx_result_it != llx_result_end && llx_result_it->data_record != *it; ++llx_result_it); - if (llx_result_it == llx_result_end) { + if (llx_result_it->data_record != *it) { // Missing LLX result for given linked data record, user did not // load-link a data record this SCX depends on. EMBB_THROW(embb::base::ErrorException, "Missing preceding LLX on a data record used as SCX dependency"); } // Copy SCX operation from LLX result of link dependency into list: - info_fields->PushBack( - llx_result_it->data_record->ScxInfo().Load()); + assert(info_fields->PushBack( + llx_result_it->data_record->ScxInfo().Load())); } // Clear thread-local list of LLX results thread_llx_results_[thread_id]->clear(); @@ -190,7 +208,12 @@ bool LlxScx::TryStoreConditional( OperationState::InProgress); // Allocate from pool as this operation description is global: ScxRecord_t * scx = scx_record_pool_.Allocate(new_scx); - return Help(scx); + bool result = Help(scx); + hp.GuardPointer(0, NULL); + hp.GuardPointer(1, NULL); + ClearLinks(); + hp.EnqueuePointerForDeletion(scx); + return result; } template< typename UserData, typename ValuePool > @@ -204,6 +227,7 @@ bool LlxScx::TryValidateLink( template< typename UserData, typename ValuePool > bool LlxScx::Help( ScxRecord_t * scx) { + hp.GuardPointer(0, scx); // We ensure that an SCX S does not change a data record // while it is frozen for another SCX S'. Instead, S uses // the information in the SCX record of S' to help S' @@ -221,13 +245,13 @@ bool LlxScx::Help( ++linked_it, ++scx_op_it) { DataRecord_t * r = *linked_it; ScxRecord * rinfo_old = *scx_op_it; + hp.GuardPointer(1, rinfo_old); // Try to freeze the data record by setting its SCX info field // to this SCX operation description: if (r->ScxInfo().CompareAndSwap(rinfo_old, scx)) { // Do not try to delete the sentinel scx record: if (rinfo_old != &DataRecord_t::DummyScx) { - scx_record_list_pool_.Free(rinfo_old->scx_ops_); - scx_record_pool_.Free(rinfo_old); + hp.EnqueuePointerForDeletion(rinfo_old); } } else { if (r->ScxInfo().Load() != scx) { @@ -264,6 +288,13 @@ bool LlxScx::Help( return true; } +template< typename UserData, typename ValuePool > +void LlxScx::DeleteOperationCallback( + ScxRecord_t * scx_record) { + scx_record_list_pool_.Free(scx_record->scx_ops_); + scx_record_pool_.Free(scx_record); +} + // LlxScxRecord template< typename UserData > diff --git a/containers_cpp/include/embb/containers/internal/llx_scx.h b/containers_cpp/include/embb/containers/internal/llx_scx.h index f246f32..5d0976c 100644 --- a/containers_cpp/include/embb/containers/internal/llx_scx.h +++ b/containers_cpp/include/embb/containers/internal/llx_scx.h @@ -34,6 +34,7 @@ #include #include #include +#include namespace embb { namespace containers { @@ -308,6 +309,7 @@ class LlxScx { private: typedef size_t cas_t; + typedef LlxScx self_t; typedef LlxScxRecord< UserData > DataRecord_t; typedef internal::ScxRecord< LlxScxRecord > ScxRecord_t; typedef typename ScxRecord_t::OperationState OperationState; @@ -342,6 +344,12 @@ class LlxScx { ); /** + * Clears the calling thread's active links previously established using + * \c TryLoadLinked. + */ + void ClearLinks(); + + /** * Actual implementation of StoreConditional operating on unified fields/values * of type cas_t. * Tentatively performs a single-record Store-Conditional operation on @@ -384,7 +392,7 @@ class LlxScx { ScxRecord_t * scx_record; UserData user_data; } LlxResult; - + /** * Resolves the calling thread's Id. */ @@ -396,6 +404,12 @@ class LlxScx { bool Help(ScxRecord_t * scx); /** + * The callback function, used to cleanup non-hazardous pointers. + * \see delete_pointer_callback + */ + void DeleteOperationCallback(ScxRecord_t * scx_record); + + /** * Maximum number of active links created via TryLoadLinked per thread. */ size_t max_links_; @@ -423,6 +437,17 @@ class LlxScx { */ embb::containers::internal::FixedSizeList< LlxResult > ** thread_llx_results_; + + /** + * Callback to the method that is called by hazard pointers if a pointer is + * not hazardous anymore, i.e., can safely be reused. + */ + embb::base::Function < void, ScxRecord_t * > delete_operation_callback; + + /** + * The hazard pointer object, used for memory management. + */ + embb::containers::internal::HazardPointer< ScxRecord_t * > hp; /** * Prevent default construction. diff --git a/containers_cpp/test/llx_scx_test.cc b/containers_cpp/test/llx_scx_test.cc index 26163ef..2e0ae61 100644 --- a/containers_cpp/test/llx_scx_test.cc +++ b/containers_cpp/test/llx_scx_test.cc @@ -48,10 +48,16 @@ LlxScxTest::LlxScxTest() : CreateUnit("SerialArrayTest") .Add(&LlxScxTest::SerialArrayTest, this); CreateUnit("ParallelTest") - .Add(&LlxScxTest::ParallelTest, this) + .Pre(&LlxScxTest::ParallelTestPre, this) + .Add(&LlxScxTest::ParallelTest, this, + static_cast(num_threads_), 1) .Post(&LlxScxTest::ParallelTestPost, this); } +void LlxScxTest::ParallelTestPre() { + embb_internal_thread_index_reset(); +} + void LlxScxTest::ParallelTest() { unsigned int thread_index; int return_val = embb_internal_thread_index(&thread_index); @@ -69,7 +75,9 @@ void LlxScxTest::ParallelTest() { Node n; bool finalized; // LLX on node the new element will be appended to: - llxscx_.TryLoadLinked(node, n, finalized); + if (!llxscx_.TryLoadLinked(node, n, finalized)) { + continue; + } if (n.next_ == next) { // Pointer still valid after LLX, try to append new node internal::FixedSizeList *> linked_deps(1); @@ -101,13 +109,18 @@ void LlxScxTest::ParallelTest() { } void LlxScxTest::ParallelTestPost() { + std::vector< std::pair > values; internal::LlxScxRecord * node = &head_llx; internal::LlxScxRecord * next = head_llx.Data().next_; while (next != 0) { - delete node; + values.push_back(std::make_pair( + next->Data().value_, + next->Data().count_.Load())); node = next; next = next->Data().next_; } + PT_ASSERT_EQ_MSG(static_cast(26 * num_threads_), values.size(), + "Unexpected size of result list"); } void LlxScxTest::SerialArrayTest() { diff --git a/containers_cpp/test/llx_scx_test.h b/containers_cpp/test/llx_scx_test.h index 06cb419..8d0e758 100644 --- a/containers_cpp/test/llx_scx_test.h +++ b/containers_cpp/test/llx_scx_test.h @@ -89,6 +89,7 @@ class LlxScxTest : public partest::TestCase { private: void SerialArrayTest(); + void ParallelTestPre(); void ParallelTest(); void ParallelTestPost();