From 746bd2fcacb73033e56a322ac0a66906c0e0ca54 Mon Sep 17 00:00:00 2001 From: Tobias Fuchs Date: Fri, 27 Mar 2015 06:03:14 +0100 Subject: [PATCH] containers_cpp: fixes of reclamation in LlxScx --- containers_cpp/include/embb/containers/internal/llx_scx-inl.h | 95 ++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------------------------- containers_cpp/include/embb/containers/internal/llx_scx.h | 24 +++++++++++------------- containers_cpp/test/llx_scx_test.cc | 55 ++++++++++++++++++++++++++++++++++++++++++++++++------- containers_cpp/test/llx_scx_test.h | 3 +++ 4 files changed, 112 insertions(+), 65 deletions(-) diff --git a/containers_cpp/include/embb/containers/internal/llx_scx-inl.h b/containers_cpp/include/embb/containers/internal/llx_scx-inl.h index 5bda60d..5506f44 100644 --- a/containers_cpp/include/embb/containers/internal/llx_scx-inl.h +++ b/containers_cpp/include/embb/containers/internal/llx_scx-inl.h @@ -111,14 +111,14 @@ bool LlxScx::TryLoadLinked( } if (marked_1 && (curr_scx->State() == OperationState::Comitted || - (curr_scx->State() == OperationState::InProgress && curr_scx->Help()))) { + (curr_scx->State() == OperationState::InProgress && Help(curr_scx)))) { // Successfully completed active SCX: finalized = true; return false; } if (data_record->ScxInfo().Load()->State() == OperationState::InProgress) { // Help active SCX: - data_record->ScxInfo().Load()->Help(); + Help(data_record->ScxInfo().Load()); } return false; } @@ -212,10 +212,10 @@ bool LlxScx::TryStoreConditionalCAS( EMBB_THROW(embb::base::ErrorException, "Missing preceding LLX on a data record used for SCX"); } - // ScxRecord_t scx_op(*(l_it->data_record->ScxInfo().Load())); info_fields->PushBack( l_it->data_record->ScxInfo().Load()); } + thread_llx_results_[thread_id]->clear(); // Announce SCX operation. Lists linked_deps and finalize_dep are // guaranteed to remain on the stack until this announced operation // is completed, so no allocation/pool is necessary. @@ -234,7 +234,7 @@ bool LlxScx::TryStoreConditionalCAS( OperationState::InProgress); // Allocate from pool as this operation description is global: ScxRecord_t * scx = scx_record_pool_.Allocate(new_scx); - return scx->Help(); + return Help(scx); } template< typename UserData, typename ValuePool > @@ -243,82 +243,87 @@ bool LlxScx::TryValidateLink( return true; // @TODO } -// LlxScxRecord - -template< typename UserData > -LlxScxRecord::LlxScxRecord() -: marked_for_finalize_(false) { - scx_op_.Store(&dummy_scx); -} - -template< typename UserData > -LlxScxRecord::LlxScxRecord( - const UserData & user_data) -: user_data_(user_data), - marked_for_finalize_(false) { - scx_op_.Store(&dummy_scx); -} - -// internal::ScxRecord +// ScxRecord -template< typename DataRecord > -bool ScxRecord::Help() { +template< typename UserData, typename ValuePool > +bool LlxScx::Help( + ScxRecord_t * scx) { // We ensure that an SCX S does not change a data record // while it is frozen for another SCX S'. Instead, S uses // the information in the SCX record of S' to help S' // complete, so that the data record can be unfrozen. - typedef embb::containers::internal::FixedSizeList dr_list_t; - typedef embb::containers::internal::FixedSizeList op_list_t; + typedef embb::containers::internal::FixedSizeList dr_list_t; + typedef embb::containers::internal::FixedSizeList op_list_t; // Freeze all data records in data_records (i.e. reserve them for this // SCX operation) to protect their mutable fields from being changed by // other SCXs: - dr_list_t::iterator linked_it = linked_data_records_->begin(); - dr_list_t::iterator linked_end = linked_data_records_->end(); - op_list_t::iterator scx_op_it = scx_ops_->begin(); - op_list_t::iterator scx_op_end = scx_ops_->end(); + dr_list_t::iterator linked_it = scx->linked_data_records_->begin(); + dr_list_t::iterator linked_end = scx->linked_data_records_->end(); + op_list_t::iterator scx_op_it = scx->scx_ops_->begin(); + op_list_t::iterator scx_op_end = scx->scx_ops_->end(); for (; linked_it != linked_end && scx_op_it != scx_op_end; ++linked_it, ++scx_op_it) { - DataRecord * r = *linked_it; - ScxRecord * rinfo_old = *scx_op_it; + DataRecord_t * r = *linked_it; + ScxRecord * rinfo_old = *scx_op_it; // Try to freeze the data record by setting its SCX info field // to this SCX operation description: - if (!r->ScxInfo().CompareAndSwap(rinfo_old, this)) { - if (r->ScxInfo().Load() != this) { + if (!r->ScxInfo().CompareAndSwap(rinfo_old, scx)) { + if (r->ScxInfo().Load() != scx) { // could not freeze r because it is frozen for another SCX: - if (all_frozen_) { + if (scx->all_frozen_) { // SCX already completed: return true; } // Atomically unfreeze all nodes frozen for this SCX (see LLX): - state_ = Aborted; + scx->state_ = ScxRecord_t::Aborted; return false; } - } - else { - // free_scx_ops.PushBack(rinfo_old); + } else { + // Do not try to delete the sentinel scx record: + if (rinfo_old->field_ != 0) { + scx_record_list_pool_.Free(rinfo_old->scx_ops_); + scx_record_pool_.Free(rinfo_old); + } } } // finished freezing data records - assert(state_ == InProgress || state_ == Comitted); + assert(scx->state_ == ScxRecord_t::InProgress || + scx->state_ == ScxRecord_t::Comitted); // frozen step: - all_frozen_ = true; + scx->all_frozen_ = true; // mark step: - dr_list_t::iterator finalize_it = finalize_data_records_->begin(); - dr_list_t::iterator finalize_end = finalize_data_records_->end(); + dr_list_t::iterator finalize_it = scx->finalize_data_records_->begin(); + dr_list_t::iterator finalize_end = scx->finalize_data_records_->end(); for (; finalize_it != finalize_end; ++finalize_it) { (*finalize_it)->MarkForFinalize(); } // update CAS: - cas_t expected_old_value = old_value_; - field_->CompareAndSwap(expected_old_value, new_value_); + cas_t expected_old_value = scx->old_value_; + scx->field_->CompareAndSwap(expected_old_value, scx->new_value_); // Commit step. // Finalizes all r in data_records within finalize range and // unfreezes all r in data_records outside of finalize range. // Linearization point of this operation. - state_ = Comitted; + scx->state_ = ScxRecord_t::Comitted; return true; } +// LlxScxRecord + +template< typename UserData > +LlxScxRecord::LlxScxRecord() +: marked_for_finalize_(false) { + scx_op_.Store(&dummy_scx); +} + +template< typename UserData > +LlxScxRecord::LlxScxRecord( + const UserData & user_data) +: user_data_(user_data), + marked_for_finalize_(false) { + scx_op_.Store(&dummy_scx); +} + template< typename UserData > ScxRecord< LlxScxRecord > LlxScxRecord::dummy_scx = diff --git a/containers_cpp/include/embb/containers/internal/llx_scx.h b/containers_cpp/include/embb/containers/internal/llx_scx.h index 5235bcc..3805d58 100644 --- a/containers_cpp/include/embb/containers/internal/llx_scx.h +++ b/containers_cpp/include/embb/containers/internal/llx_scx.h @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -245,14 +246,14 @@ class ScxRecord { * Constructor. */ ScxRecord( - embb::containers::internal::FixedSizeList & + FixedSizeList & linked_data_records, - embb::containers::internal::FixedSizeList & + FixedSizeList & finalize_data_records, embb::base::Atomic * field, cas_t new_value, cas_t old_value, - embb::containers::internal::FixedSizeList * scx_ops, + FixedSizeList * scx_ops, OperationState operation_state) : linked_data_records_(&linked_data_records), finalize_data_records_(&finalize_data_records), @@ -267,25 +268,20 @@ class ScxRecord { OperationState State() const { return state_; } - - /** - * Returns true if helped operation has been completed. - */ - bool Help(); - - private: + + public: /** * Sequence of load-linked data records for this SCX operation. * Named 'V' in original publication. */ - const embb::containers::internal::FixedSizeList * + const FixedSizeList * linked_data_records_; /** * Sequence of data records to be finalized in this SCX operation. * Named 'R' in original publication. */ - const embb::containers::internal::FixedSizeList * + const FixedSizeList * finalize_data_records_; /** @@ -311,7 +307,7 @@ class ScxRecord { * List of SCX operation descriptions associated with data records * linked with this SCX operation. */ - embb::containers::internal::FixedSizeList * scx_ops_; + FixedSizeList * scx_ops_; /** * Current state of this SCX record. @@ -649,6 +645,8 @@ class LlxScx { */ LlxScx & operator=(const LlxScx &); + bool Help(ScxRecord_t * scx); + /** * Actual implementation of StoreConditional operating on unified fields/values * of type cas_t. diff --git a/containers_cpp/test/llx_scx_test.cc b/containers_cpp/test/llx_scx_test.cc index 3599511..44d2b51 100644 --- a/containers_cpp/test/llx_scx_test.cc +++ b/containers_cpp/test/llx_scx_test.cc @@ -41,21 +41,62 @@ LlxScxTest::LlxScxTest() : static_cast(partest::TestSuite::GetDefaultNumThreads())), llxscx_(3), tail(0, '-'), - head(0, '-', Node::node_ptr_t(&tail)) { + head(0, '-'), + tail_llx(tail), + head_llx(head) { CreateUnit("SerialArrayTest").Add(&LlxScxTest::SerialArrayTest, this); CreateUnit("SerialListTest").Add(&LlxScxTest::SerialListTest, this); - CreateUnit("ParallelTest").Add(&LlxScxTest::ParallelTest, this); +// CreateUnit("ParallelTest") +// .Add(&LlxScxTest::ParallelTest, this) +// .Post(&LlxScxTest::ParallelTestPost, this); } void LlxScxTest::ParallelTest() { - typedef LlxScxTest::Node Node; - unsigned int thread_index; int return_val = embb_internal_thread_index(&thread_index); if (return_val != EMBB_SUCCESS) - EMBB_THROW(embb::base::ErrorException, "Could not get thread id!"); - + EMBB_THROW(embb::base::ErrorException, "Could not get thread id!"); + // Threads try to append n nodes to a linked list in parallel + for (char value = 'a'; value <= 'z';) { + // Find node to append new element on: + internal::LlxScxRecord * node = &head_llx; + internal::LlxScxRecord * next = node->Data().next_; + while (next != 0 && next->Data().value_ < value) { + node = next; + next = next->Data().next_; + } + Node n; + llxscx_.TryLoadLinked(node, n); + if (n.next_ == next) { + // Pointer still valid after LLX, call SCX(node, node.next, new_node) + internal::FixedSizeList *> linked_deps(1); + linked_deps.PushBack(node); + // Create new node: + Node new_node(static_cast(thread_index), value); + internal::LlxScxRecord * new_node_ptr = + new internal::LlxScxRecord(new_node); + bool element_inserted = + llxscx_.TryStoreConditional( + &(node->Data().next_), + new_node_ptr, + linked_deps); + if (element_inserted) { + // Value has been added to list, continue with next value + ++value; + } + } + } +} + +void LlxScxTest::ParallelTestPost() { + internal::LlxScxRecord * node = &head_llx; + internal::LlxScxRecord * next = head_llx.Data().next_; + while (next != 0) { + delete node; + node = next; + next = next->Data().next_; + } } void LlxScxTest::SerialArrayTest() { @@ -67,7 +108,7 @@ void LlxScxTest::SerialArrayTest() { // Atomic not assignable, TryStoreConditional requires // a specialization for atomics that uses a.Store(b.Load()). AtomicField field(23); - // Initialize + LlxScxRecord< Payload > * my_list = new LlxScxRecord[10]; for (int i = 0; i != 10; ++i) { diff --git a/containers_cpp/test/llx_scx_test.h b/containers_cpp/test/llx_scx_test.h index 44def8c..92de8a3 100644 --- a/containers_cpp/test/llx_scx_test.h +++ b/containers_cpp/test/llx_scx_test.h @@ -91,11 +91,14 @@ class LlxScxTest : public partest::TestCase { void SerialArrayTest(); void SerialListTest(); void ParallelTest(); + void ParallelTestPost(); int num_threads_; internal::LlxScx llxscx_; Node tail; Node head; + internal::LlxScxRecord tail_llx; + internal::LlxScxRecord head_llx; }; } // namespace test -- libgit2 0.26.0