Commit 7dbd2a90 by Tobias Fuchs

Merge branch 'embb327_llx_scx' of github.com:siemens/embb into embb327_llx_scx

parents f2dc8700 4a244a07
......@@ -49,8 +49,19 @@ template< typename UserData, typename ValuePool >
LlxScx<UserData, ValuePool>::LlxScx(size_t max_links)
: max_links_(max_links),
max_threads_(embb::base::Thread::GetThreadsMaxCount()),
scx_record_list_pool_(max_threads_),
scx_record_pool_(max_threads_) {
scx_record_list_pool_(max_threads_ * max_threads_ * 2),
scx_record_pool_(max_threads_ * max_threads_ * 2),
// Disable "this is used in base member initializer" warning.
// We explicitly want this.
#ifdef EMBB_PLATFORM_COMPILER_MSVC
#pragma warning(push)
#pragma warning(disable:4355)
#endif
delete_operation_callback(*this, &self_t::DeleteOperationCallback),
#ifdef EMBB_PLATFORM_COMPILER_MSVC
#pragma warning(pop)
#endif
hp(delete_operation_callback, NULL, 2) {
typedef embb::containers::internal::FixedSizeList<LlxResult>
llx_result_list_t;
typedef embb::containers::internal::FixedSizeList<ScxRecord_t>
......@@ -86,9 +97,9 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked(
finalized = false;
unsigned int thread_id = ThreadId();
// Order of initialization matters:
bool marked_1 = data_record->IsMarkedForFinalize();
ScxRecord_t * curr_scx = data_record->ScxInfo().Load();
OperationState curr_state = curr_scx->State();
volatile bool marked_1 = data_record->IsMarkedForFinalize();
ScxRecord_t * curr_scx = data_record->ScxInfo().Load();
volatile OperationState curr_state = curr_scx->State();
bool marked_2 = data_record->IsMarkedForFinalize();
if (curr_state == OperationState::Aborted ||
(curr_state == OperationState::Comitted && !marked_2)) {
......@@ -101,7 +112,7 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked(
llx_result.data_record = data_record;
llx_result.scx_record = curr_scx;
llx_result.user_data = user_data_local;
thread_llx_results_[thread_id]->PushBack(llx_result);
assert(thread_llx_results_[thread_id]->PushBack(llx_result));
// Set return value:
user_data = user_data_local;
return true;
......@@ -122,6 +133,13 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked(
}
template< typename UserData, typename ValuePool >
void LlxScx<UserData, ValuePool>::ClearLinks() {
// Clear thread-local list of LLX results
unsigned int thread_id = ThreadId();
thread_llx_results_[thread_id]->clear();
}
template< typename UserData, typename ValuePool >
bool LlxScx<UserData, ValuePool>::TryStoreConditional(
embb::base::Atomic<cas_t> * cas_field,
cas_t cas_value,
......@@ -160,15 +178,15 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
llx_result_it != llx_result_end &&
llx_result_it->data_record != *it;
++llx_result_it);
if (llx_result_it == llx_result_end) {
if (llx_result_it->data_record != *it) {
// Missing LLX result for given linked data record, user did not
// load-link a data record this SCX depends on.
EMBB_THROW(embb::base::ErrorException,
"Missing preceding LLX on a data record used as SCX dependency");
}
// Copy SCX operation from LLX result of link dependency into list:
info_fields->PushBack(
llx_result_it->data_record->ScxInfo().Load());
assert(info_fields->PushBack(
llx_result_it->data_record->ScxInfo().Load()));
}
// Clear thread-local list of LLX results
thread_llx_results_[thread_id]->clear();
......@@ -190,7 +208,12 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
OperationState::InProgress);
// Allocate from pool as this operation description is global:
ScxRecord_t * scx = scx_record_pool_.Allocate(new_scx);
return Help(scx);
bool result = Help(scx);
hp.GuardPointer(0, NULL);
hp.GuardPointer(1, NULL);
ClearLinks();
hp.EnqueuePointerForDeletion(scx);
return result;
}
template< typename UserData, typename ValuePool >
......@@ -204,6 +227,7 @@ bool LlxScx<UserData, ValuePool>::TryValidateLink(
template< typename UserData, typename ValuePool >
bool LlxScx<UserData, ValuePool>::Help(
ScxRecord_t * scx) {
hp.GuardPointer(0, scx);
// We ensure that an SCX S does not change a data record
// while it is frozen for another SCX S'. Instead, S uses
// the information in the SCX record of S' to help S'
......@@ -221,13 +245,13 @@ bool LlxScx<UserData, ValuePool>::Help(
++linked_it, ++scx_op_it) {
DataRecord_t * r = *linked_it;
ScxRecord<DataRecord_t> * rinfo_old = *scx_op_it;
hp.GuardPointer(1, rinfo_old);
// Try to freeze the data record by setting its SCX info field
// to this SCX operation description:
if (r->ScxInfo().CompareAndSwap(rinfo_old, scx)) {
// Do not try to delete the sentinel scx record:
if (rinfo_old != &DataRecord_t::DummyScx) {
scx_record_list_pool_.Free(rinfo_old->scx_ops_);
scx_record_pool_.Free(rinfo_old);
hp.EnqueuePointerForDeletion(rinfo_old);
}
} else {
if (r->ScxInfo().Load() != scx) {
......@@ -264,6 +288,13 @@ bool LlxScx<UserData, ValuePool>::Help(
return true;
}
template< typename UserData, typename ValuePool >
void LlxScx<UserData, ValuePool>::DeleteOperationCallback(
ScxRecord_t * scx_record) {
scx_record_list_pool_.Free(scx_record->scx_ops_);
scx_record_pool_.Free(scx_record);
}
// LlxScxRecord
template< typename UserData >
......
......@@ -34,6 +34,7 @@
#include <embb/containers/object_pool.h>
#include <embb/containers/lock_free_tree_value_pool.h>
#include <embb/containers/internal/fixed_size_list.h>
#include <embb/containers/internal/hazard_pointer.h>
namespace embb {
namespace containers {
......@@ -308,6 +309,7 @@ class LlxScx {
private:
typedef size_t cas_t;
typedef LlxScx<UserData, ValuePool> self_t;
typedef LlxScxRecord< UserData > DataRecord_t;
typedef internal::ScxRecord< LlxScxRecord<UserData> > ScxRecord_t;
typedef typename ScxRecord_t::OperationState OperationState;
......@@ -342,6 +344,12 @@ class LlxScx {
);
/**
* Clears the calling thread's active links previously established using
* \c TryLoadLinked.
*/
void ClearLinks();
/**
* Actual implementation of StoreConditional operating on unified fields/values
* of type cas_t.
* Tentatively performs a single-record Store-Conditional operation on
......@@ -384,7 +392,7 @@ class LlxScx {
ScxRecord_t * scx_record;
UserData user_data;
} LlxResult;
/**
* Resolves the calling thread's Id.
*/
......@@ -396,6 +404,12 @@ class LlxScx {
bool Help(ScxRecord_t * scx);
/**
* The callback function, used to cleanup non-hazardous pointers.
* \see delete_pointer_callback
*/
void DeleteOperationCallback(ScxRecord_t * scx_record);
/**
* Maximum number of active links created via TryLoadLinked per thread.
*/
size_t max_links_;
......@@ -423,6 +437,17 @@ class LlxScx {
*/
embb::containers::internal::FixedSizeList< LlxResult > **
thread_llx_results_;
/**
* Callback to the method that is called by hazard pointers if a pointer is
* not hazardous anymore, i.e., can safely be reused.
*/
embb::base::Function < void, ScxRecord_t * > delete_operation_callback;
/**
* The hazard pointer object, used for memory management.
*/
embb::containers::internal::HazardPointer< ScxRecord_t * > hp;
/**
* Prevent default construction.
......
......@@ -48,10 +48,16 @@ LlxScxTest::LlxScxTest() :
CreateUnit("SerialArrayTest")
.Add(&LlxScxTest::SerialArrayTest, this);
CreateUnit("ParallelTest")
.Add(&LlxScxTest::ParallelTest, this)
.Pre(&LlxScxTest::ParallelTestPre, this)
.Add(&LlxScxTest::ParallelTest, this,
static_cast<unsigned int>(num_threads_), 1)
.Post(&LlxScxTest::ParallelTestPost, this);
}
void LlxScxTest::ParallelTestPre() {
embb_internal_thread_index_reset();
}
void LlxScxTest::ParallelTest() {
unsigned int thread_index;
int return_val = embb_internal_thread_index(&thread_index);
......@@ -69,7 +75,9 @@ void LlxScxTest::ParallelTest() {
Node n;
bool finalized;
// LLX on node the new element will be appended to:
llxscx_.TryLoadLinked(node, n, finalized);
if (!llxscx_.TryLoadLinked(node, n, finalized)) {
continue;
}
if (n.next_ == next) {
// Pointer still valid after LLX, try to append new node
internal::FixedSizeList<LlxScxRecord<Node> *> linked_deps(1);
......@@ -101,13 +109,18 @@ void LlxScxTest::ParallelTest() {
}
void LlxScxTest::ParallelTestPost() {
std::vector< std::pair<char, int> > values;
internal::LlxScxRecord<Node> * node = &head_llx;
internal::LlxScxRecord<Node> * next = head_llx.Data().next_;
while (next != 0) {
delete node;
values.push_back(std::make_pair(
next->Data().value_,
next->Data().count_.Load()));
node = next;
next = next->Data().next_;
}
PT_ASSERT_EQ_MSG(static_cast<size_t>(26 * num_threads_), values.size(),
"Unexpected size of result list");
}
void LlxScxTest::SerialArrayTest() {
......
......@@ -89,6 +89,7 @@ class LlxScxTest : public partest::TestCase {
private:
void SerialArrayTest();
void ParallelTestPre();
void ParallelTest();
void ParallelTestPost();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment