Commit 4a244a07 by Tobias Fuchs

containers_cpp: added safe memory reclamation for operation descriptions in LlxScx

parent 2be2c1e2
...@@ -49,8 +49,19 @@ template< typename UserData, typename ValuePool > ...@@ -49,8 +49,19 @@ template< typename UserData, typename ValuePool >
LlxScx<UserData, ValuePool>::LlxScx(size_t max_links) LlxScx<UserData, ValuePool>::LlxScx(size_t max_links)
: max_links_(max_links), : max_links_(max_links),
max_threads_(embb::base::Thread::GetThreadsMaxCount()), max_threads_(embb::base::Thread::GetThreadsMaxCount()),
scx_record_list_pool_(max_threads_), scx_record_list_pool_(max_threads_ * max_threads_ * 2),
scx_record_pool_(max_threads_) { scx_record_pool_(max_threads_ * max_threads_ * 2),
// Disable "this is used in base member initializer" warning.
// We explicitly want this.
#ifdef EMBB_PLATFORM_COMPILER_MSVC
#pragma warning(push)
#pragma warning(disable:4355)
#endif
delete_operation_callback(*this, &self_t::DeleteOperationCallback),
#ifdef EMBB_PLATFORM_COMPILER_MSVC
#pragma warning(pop)
#endif
hp(delete_operation_callback, NULL, 2) {
typedef embb::containers::internal::FixedSizeList<LlxResult> typedef embb::containers::internal::FixedSizeList<LlxResult>
llx_result_list_t; llx_result_list_t;
typedef embb::containers::internal::FixedSizeList<ScxRecord_t> typedef embb::containers::internal::FixedSizeList<ScxRecord_t>
...@@ -86,9 +97,9 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked( ...@@ -86,9 +97,9 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked(
finalized = false; finalized = false;
unsigned int thread_id = ThreadId(); unsigned int thread_id = ThreadId();
// Order of initialization matters: // Order of initialization matters:
bool marked_1 = data_record->IsMarkedForFinalize(); volatile bool marked_1 = data_record->IsMarkedForFinalize();
ScxRecord_t * curr_scx = data_record->ScxInfo().Load(); ScxRecord_t * curr_scx = data_record->ScxInfo().Load();
OperationState curr_state = curr_scx->State(); volatile OperationState curr_state = curr_scx->State();
bool marked_2 = data_record->IsMarkedForFinalize(); bool marked_2 = data_record->IsMarkedForFinalize();
if (curr_state == OperationState::Aborted || if (curr_state == OperationState::Aborted ||
(curr_state == OperationState::Comitted && !marked_2)) { (curr_state == OperationState::Comitted && !marked_2)) {
...@@ -101,7 +112,7 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked( ...@@ -101,7 +112,7 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked(
llx_result.data_record = data_record; llx_result.data_record = data_record;
llx_result.scx_record = curr_scx; llx_result.scx_record = curr_scx;
llx_result.user_data = user_data_local; llx_result.user_data = user_data_local;
thread_llx_results_[thread_id]->PushBack(llx_result); assert(thread_llx_results_[thread_id]->PushBack(llx_result));
// Set return value: // Set return value:
user_data = user_data_local; user_data = user_data_local;
return true; return true;
...@@ -122,6 +133,13 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked( ...@@ -122,6 +133,13 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked(
} }
template< typename UserData, typename ValuePool > template< typename UserData, typename ValuePool >
void LlxScx<UserData, ValuePool>::ClearLinks() {
// Clear thread-local list of LLX results
unsigned int thread_id = ThreadId();
thread_llx_results_[thread_id]->clear();
}
template< typename UserData, typename ValuePool >
bool LlxScx<UserData, ValuePool>::TryStoreConditional( bool LlxScx<UserData, ValuePool>::TryStoreConditional(
embb::base::Atomic<cas_t> * cas_field, embb::base::Atomic<cas_t> * cas_field,
cas_t cas_value, cas_t cas_value,
...@@ -160,15 +178,15 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional( ...@@ -160,15 +178,15 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
llx_result_it != llx_result_end && llx_result_it != llx_result_end &&
llx_result_it->data_record != *it; llx_result_it->data_record != *it;
++llx_result_it); ++llx_result_it);
if (llx_result_it == llx_result_end) { if (llx_result_it->data_record != *it) {
// Missing LLX result for given linked data record, user did not // Missing LLX result for given linked data record, user did not
// load-link a data record this SCX depends on. // load-link a data record this SCX depends on.
EMBB_THROW(embb::base::ErrorException, EMBB_THROW(embb::base::ErrorException,
"Missing preceding LLX on a data record used as SCX dependency"); "Missing preceding LLX on a data record used as SCX dependency");
} }
// Copy SCX operation from LLX result of link dependency into list: // Copy SCX operation from LLX result of link dependency into list:
info_fields->PushBack( assert(info_fields->PushBack(
llx_result_it->data_record->ScxInfo().Load()); llx_result_it->data_record->ScxInfo().Load()));
} }
// Clear thread-local list of LLX results // Clear thread-local list of LLX results
thread_llx_results_[thread_id]->clear(); thread_llx_results_[thread_id]->clear();
...@@ -190,7 +208,12 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional( ...@@ -190,7 +208,12 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
OperationState::InProgress); OperationState::InProgress);
// Allocate from pool as this operation description is global: // Allocate from pool as this operation description is global:
ScxRecord_t * scx = scx_record_pool_.Allocate(new_scx); ScxRecord_t * scx = scx_record_pool_.Allocate(new_scx);
return Help(scx); bool result = Help(scx);
hp.GuardPointer(0, NULL);
hp.GuardPointer(1, NULL);
ClearLinks();
hp.EnqueuePointerForDeletion(scx);
return result;
} }
template< typename UserData, typename ValuePool > template< typename UserData, typename ValuePool >
...@@ -204,6 +227,7 @@ bool LlxScx<UserData, ValuePool>::TryValidateLink( ...@@ -204,6 +227,7 @@ bool LlxScx<UserData, ValuePool>::TryValidateLink(
template< typename UserData, typename ValuePool > template< typename UserData, typename ValuePool >
bool LlxScx<UserData, ValuePool>::Help( bool LlxScx<UserData, ValuePool>::Help(
ScxRecord_t * scx) { ScxRecord_t * scx) {
hp.GuardPointer(0, scx);
// We ensure that an SCX S does not change a data record // We ensure that an SCX S does not change a data record
// while it is frozen for another SCX S'. Instead, S uses // while it is frozen for another SCX S'. Instead, S uses
// the information in the SCX record of S' to help S' // the information in the SCX record of S' to help S'
...@@ -221,13 +245,13 @@ bool LlxScx<UserData, ValuePool>::Help( ...@@ -221,13 +245,13 @@ bool LlxScx<UserData, ValuePool>::Help(
++linked_it, ++scx_op_it) { ++linked_it, ++scx_op_it) {
DataRecord_t * r = *linked_it; DataRecord_t * r = *linked_it;
ScxRecord<DataRecord_t> * rinfo_old = *scx_op_it; ScxRecord<DataRecord_t> * rinfo_old = *scx_op_it;
hp.GuardPointer(1, rinfo_old);
// Try to freeze the data record by setting its SCX info field // Try to freeze the data record by setting its SCX info field
// to this SCX operation description: // to this SCX operation description:
if (r->ScxInfo().CompareAndSwap(rinfo_old, scx)) { if (r->ScxInfo().CompareAndSwap(rinfo_old, scx)) {
// Do not try to delete the sentinel scx record: // Do not try to delete the sentinel scx record:
if (rinfo_old != &DataRecord_t::DummyScx) { if (rinfo_old != &DataRecord_t::DummyScx) {
scx_record_list_pool_.Free(rinfo_old->scx_ops_); hp.EnqueuePointerForDeletion(rinfo_old);
scx_record_pool_.Free(rinfo_old);
} }
} else { } else {
if (r->ScxInfo().Load() != scx) { if (r->ScxInfo().Load() != scx) {
...@@ -264,6 +288,13 @@ bool LlxScx<UserData, ValuePool>::Help( ...@@ -264,6 +288,13 @@ bool LlxScx<UserData, ValuePool>::Help(
return true; return true;
} }
template< typename UserData, typename ValuePool >
void LlxScx<UserData, ValuePool>::DeleteOperationCallback(
ScxRecord_t * scx_record) {
scx_record_list_pool_.Free(scx_record->scx_ops_);
scx_record_pool_.Free(scx_record);
}
// LlxScxRecord // LlxScxRecord
template< typename UserData > template< typename UserData >
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include <embb/containers/object_pool.h> #include <embb/containers/object_pool.h>
#include <embb/containers/lock_free_tree_value_pool.h> #include <embb/containers/lock_free_tree_value_pool.h>
#include <embb/containers/internal/fixed_size_list.h> #include <embb/containers/internal/fixed_size_list.h>
#include <embb/containers/internal/hazard_pointer.h>
namespace embb { namespace embb {
namespace containers { namespace containers {
...@@ -308,6 +309,7 @@ class LlxScx { ...@@ -308,6 +309,7 @@ class LlxScx {
private: private:
typedef size_t cas_t; typedef size_t cas_t;
typedef LlxScx<UserData, ValuePool> self_t;
typedef LlxScxRecord< UserData > DataRecord_t; typedef LlxScxRecord< UserData > DataRecord_t;
typedef internal::ScxRecord< LlxScxRecord<UserData> > ScxRecord_t; typedef internal::ScxRecord< LlxScxRecord<UserData> > ScxRecord_t;
typedef typename ScxRecord_t::OperationState OperationState; typedef typename ScxRecord_t::OperationState OperationState;
...@@ -342,6 +344,12 @@ class LlxScx { ...@@ -342,6 +344,12 @@ class LlxScx {
); );
/** /**
* Clears the calling thread's active links previously established using
* \c TryLoadLinked.
*/
void ClearLinks();
/**
* Actual implementation of StoreConditional operating on unified fields/values * Actual implementation of StoreConditional operating on unified fields/values
* of type cas_t. * of type cas_t.
* Tentatively performs a single-record Store-Conditional operation on * Tentatively performs a single-record Store-Conditional operation on
...@@ -396,6 +404,12 @@ class LlxScx { ...@@ -396,6 +404,12 @@ class LlxScx {
bool Help(ScxRecord_t * scx); bool Help(ScxRecord_t * scx);
/** /**
* The callback function, used to cleanup non-hazardous pointers.
* \see delete_pointer_callback
*/
void DeleteOperationCallback(ScxRecord_t * scx_record);
/**
* Maximum number of active links created via TryLoadLinked per thread. * Maximum number of active links created via TryLoadLinked per thread.
*/ */
size_t max_links_; size_t max_links_;
...@@ -425,6 +439,17 @@ class LlxScx { ...@@ -425,6 +439,17 @@ class LlxScx {
thread_llx_results_; thread_llx_results_;
/** /**
* Callback to the method that is called by hazard pointers if a pointer is
* not hazardous anymore, i.e., can safely be reused.
*/
embb::base::Function < void, ScxRecord_t * > delete_operation_callback;
/**
* The hazard pointer object, used for memory management.
*/
embb::containers::internal::HazardPointer< ScxRecord_t * > hp;
/**
* Prevent default construction. * Prevent default construction.
*/ */
LlxScx(); LlxScx();
......
...@@ -48,10 +48,16 @@ LlxScxTest::LlxScxTest() : ...@@ -48,10 +48,16 @@ LlxScxTest::LlxScxTest() :
CreateUnit("SerialArrayTest") CreateUnit("SerialArrayTest")
.Add(&LlxScxTest::SerialArrayTest, this); .Add(&LlxScxTest::SerialArrayTest, this);
CreateUnit("ParallelTest") CreateUnit("ParallelTest")
.Add(&LlxScxTest::ParallelTest, this) .Pre(&LlxScxTest::ParallelTestPre, this)
.Add(&LlxScxTest::ParallelTest, this,
static_cast<unsigned int>(num_threads_), 1)
.Post(&LlxScxTest::ParallelTestPost, this); .Post(&LlxScxTest::ParallelTestPost, this);
} }
void LlxScxTest::ParallelTestPre() {
embb_internal_thread_index_reset();
}
void LlxScxTest::ParallelTest() { void LlxScxTest::ParallelTest() {
unsigned int thread_index; unsigned int thread_index;
int return_val = embb_internal_thread_index(&thread_index); int return_val = embb_internal_thread_index(&thread_index);
...@@ -69,7 +75,9 @@ void LlxScxTest::ParallelTest() { ...@@ -69,7 +75,9 @@ void LlxScxTest::ParallelTest() {
Node n; Node n;
bool finalized; bool finalized;
// LLX on node the new element will be appended to: // LLX on node the new element will be appended to:
llxscx_.TryLoadLinked(node, n, finalized); if (!llxscx_.TryLoadLinked(node, n, finalized)) {
continue;
}
if (n.next_ == next) { if (n.next_ == next) {
// Pointer still valid after LLX, try to append new node // Pointer still valid after LLX, try to append new node
internal::FixedSizeList<LlxScxRecord<Node> *> linked_deps(1); internal::FixedSizeList<LlxScxRecord<Node> *> linked_deps(1);
...@@ -101,13 +109,18 @@ void LlxScxTest::ParallelTest() { ...@@ -101,13 +109,18 @@ void LlxScxTest::ParallelTest() {
} }
void LlxScxTest::ParallelTestPost() { void LlxScxTest::ParallelTestPost() {
std::vector< std::pair<char, int> > values;
internal::LlxScxRecord<Node> * node = &head_llx; internal::LlxScxRecord<Node> * node = &head_llx;
internal::LlxScxRecord<Node> * next = head_llx.Data().next_; internal::LlxScxRecord<Node> * next = head_llx.Data().next_;
while (next != 0) { while (next != 0) {
delete node; values.push_back(std::make_pair(
next->Data().value_,
next->Data().count_.Load()));
node = next; node = next;
next = next->Data().next_; next = next->Data().next_;
} }
PT_ASSERT_EQ_MSG(static_cast<size_t>(26 * num_threads_), values.size(),
"Unexpected size of result list");
} }
void LlxScxTest::SerialArrayTest() { void LlxScxTest::SerialArrayTest() {
......
...@@ -89,6 +89,7 @@ class LlxScxTest : public partest::TestCase { ...@@ -89,6 +89,7 @@ class LlxScxTest : public partest::TestCase {
private: private:
void SerialArrayTest(); void SerialArrayTest();
void ParallelTestPre();
void ParallelTest(); void ParallelTest();
void ParallelTestPost(); void ParallelTestPost();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment