Commit 121a922f by Tobias Fuchs

containers_cpp: added LlxScx.TryValidateLink

parent 92213745
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include <embb/base/thread.h> #include <embb/base/thread.h>
#include <embb/base/atomic.h> #include <embb/base/atomic.h>
#include <embb/base/memory_allocation.h> #include <embb/base/memory_allocation.h>
#include <algorithm>
namespace embb { namespace embb {
namespace containers { namespace containers {
...@@ -104,7 +105,7 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked( ...@@ -104,7 +105,7 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked(
// using guard 0 again. // using guard 0 again.
// This hazard pointer is validated in the nested if-block below. // This hazard pointer is validated in the nested if-block below.
hp.GuardPointer(1, curr_scx); hp.GuardPointer(1, curr_scx);
volatile OperationState curr_state = curr_scx->State(); volatile OperationState curr_state = curr_scx->state;
bool marked_2 = data_record->IsMarkedForFinalize(); bool marked_2 = data_record->IsMarkedForFinalize();
if (curr_state == OperationState::Aborted || if (curr_state == OperationState::Aborted ||
(curr_state == OperationState::Comitted && !marked_2)) { (curr_state == OperationState::Comitted && !marked_2)) {
...@@ -128,14 +129,14 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked( ...@@ -128,14 +129,14 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked(
} }
// Active SCX record of data record has been changed in between // Active SCX record of data record has been changed in between
if (marked_1 && if (marked_1 &&
(curr_scx->State() == OperationState::Comitted || (curr_scx->state == OperationState::Comitted ||
(curr_scx->State() == OperationState::InProgress && Help(curr_scx)))) { (curr_scx->state == OperationState::InProgress && Help(curr_scx)))) {
// Successfully completed the data record's active SCX but failed to // Successfully completed the data record's active SCX but failed to
// complete the LLX operation because the data record has been finalized: // complete the LLX operation because the data record has been finalized:
finalized = true; finalized = true;
return false; return false;
} }
if (data_record->ScxInfo().Load()->State() == OperationState::InProgress) { if (data_record->ScxInfo().Load()->state == OperationState::InProgress) {
// Help active SCX. // Help active SCX.
// This SCX record has been guarded above. // This SCX record has been guarded above.
ScxRecord_t * data_record_scx = data_record->ScxInfo().Load(); ScxRecord_t * data_record_scx = data_record->ScxInfo().Load();
...@@ -157,8 +158,12 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional( ...@@ -157,8 +158,12 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
cas_t cas_value, cas_t cas_value,
embb::containers::internal::FixedSizeList<DataRecord_t *> & linked_deps, embb::containers::internal::FixedSizeList<DataRecord_t *> & linked_deps,
embb::containers::internal::FixedSizeList<DataRecord_t *> & finalize_deps) { embb::containers::internal::FixedSizeList<DataRecord_t *> & finalize_deps) {
typedef embb::containers::internal::FixedSizeList<DataRecord_t *> dr_list_t; typedef embb::containers::internal::FixedSizeList<DataRecord_t *>
typedef embb::containers::internal::FixedSizeList<ScxRecord_t *> scx_op_list_t; dr_list_t;
typedef embb::containers::internal::FixedSizeList<ScxRecord_t *>
scx_op_list_t;
typedef embb::containers::internal::FixedSizeList<LlxResult>
llx_result_list_t;
// Preconditions: // Preconditions:
// 1. For each r in linked_deps, this thread has performed an invocation // 1. For each r in linked_deps, this thread has performed an invocation
// I_r of LLX(r) linked to this SCX. // I_r of LLX(r) linked to this SCX.
...@@ -168,30 +173,28 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional( ...@@ -168,30 +173,28 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
unsigned int thread_id = ThreadId(); unsigned int thread_id = ThreadId();
// Let info_fields be a table in shared memory containing for each r in V, // Let info_fields be a table in shared memory containing for each r in V,
// a copy of r's info value in this threads local table of LLX results. // a copy of r's info value in this threads local table of LLX results.
// Will be freed in Help() once the SCX operation has been completed.
// In brief: A list of the SCX record of all linked deps. // In brief: A list of the SCX record of all linked deps.
scx_op_list_t * info_fields = scx_record_list_pool_.Allocate(max_links_); scx_op_list_t * info_fields = scx_record_list_pool_.Allocate(max_links_);
if (info_fields == NULL) { if (info_fields == NULL) {
EMBB_THROW(embb::base::ErrorException, EMBB_THROW(embb::base::ErrorException,
"Could not allocate SCX record list"); "Could not allocate SCX record list");
} }
dr_list_t::const_iterator it; dr_list_t::const_iterator data_record;
dr_list_t::const_iterator end; dr_list_t::const_iterator end;
end = linked_deps.end(); end = linked_deps.end();
// Copy SCX operation of all LLX results of link dependencies into a list. // Copy SCX operation of all LLX results of link dependencies into a list.
// For each r in linked_deps ... // For each r in linked_deps ...
for (it = linked_deps.begin(); it != end; ++it) { for (data_record = linked_deps.begin(); data_record != end; ++data_record) {
typedef embb::containers::internal::FixedSizeList<LlxResult> llx_result_list_t::iterator llx_result_it =
llx_result_list; thread_llx_results_[thread_id]->begin();
llx_result_list::iterator llx_result_it; llx_result_list_t::iterator llx_result_end =
llx_result_list::iterator llx_result_end; thread_llx_results_[thread_id]->end();
llx_result_end = thread_llx_results_[thread_id]->end();
// Find LLX result of data_record (r) in thread-local LLX results: // Find LLX result of data_record (r) in thread-local LLX results:
for (llx_result_it = thread_llx_results_[thread_id]->begin(); while (llx_result_it != llx_result_end &&
llx_result_it != llx_result_end && llx_result_it->data_record != *data_record) {
llx_result_it->data_record != *it; ++llx_result_it;
++llx_result_it); }
if (llx_result_it->data_record != *it) { if (llx_result_it->data_record != *data_record) {
// Missing LLX result for given linked data record, user did not // Missing LLX result for given linked data record, user did not
// load-link a data record this SCX depends on. // load-link a data record this SCX depends on.
EMBB_THROW(embb::base::ErrorException, EMBB_THROW(embb::base::ErrorException,
...@@ -238,8 +241,37 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional( ...@@ -238,8 +241,37 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
template< typename UserData, typename ValuePool > template< typename UserData, typename ValuePool >
bool LlxScx<UserData, ValuePool>::TryValidateLink( bool LlxScx<UserData, ValuePool>::TryValidateLink(
const DataRecord_t & field) { embb::containers::internal::FixedSizeList<DataRecord_t *> & linked_deps) {
return true; // @TODO typedef embb::containers::internal::FixedSizeList<DataRecord_t *>
dr_list_t;
typedef embb::containers::internal::FixedSizeList<LlxResult>
llx_result_list_t;
unsigned int thread_id = ThreadId();
// Iterate over given list of data records V:
dr_list_t::iterator linked_it = linked_deps.begin();
dr_list_t::iterator linked_end = linked_deps.end();
// For each r in V ...
for (; linked_it != linked_end; ++linked_it) {
llx_result_list_t::iterator llx_result_it =
thread_llx_results_[thread_id]->begin();
llx_result_list_t::iterator llx_result_end =
thread_llx_results_[thread_id]->end();
// Find LLX result of data_record (r) in thread-local LLX results:
while (llx_result_it != llx_result_end &&
llx_result_it->data_record != *linked_it) {
++llx_result_it;
}
if (llx_result_it->data_record != *linked_it) {
// Missing LLX result for given linked data record, user did not
// load-link a data record this SCX depends on.
EMBB_THROW(embb::base::ErrorException,
"Missing preceding LLX on a data record used as SCX dependency");
}
if (llx_result_it->scx_record != (*linked_it)->ScxInfo()) {
return false;
}
}
return true;
} }
// ScxRecord // ScxRecord
...@@ -257,10 +289,10 @@ bool LlxScx<UserData, ValuePool>::Help( ...@@ -257,10 +289,10 @@ bool LlxScx<UserData, ValuePool>::Help(
// Freeze all data records in data_records (i.e. reserve them for this // Freeze all data records in data_records (i.e. reserve them for this
// SCX operation) to protect their mutable fields from being changed by // SCX operation) to protect their mutable fields from being changed by
// other SCXs: // other SCXs:
dr_list_t::iterator linked_it = scx->linked_data_records_->begin(); dr_list_t::iterator linked_it = scx->linked_data_records->begin();
dr_list_t::iterator linked_end = scx->linked_data_records_->end(); dr_list_t::iterator linked_end = scx->linked_data_records->end();
op_list_t::iterator scx_op_it = scx->scx_ops_->begin(); op_list_t::iterator scx_op_it = scx->scx_ops->begin();
op_list_t::iterator scx_op_end = scx->scx_ops_->end(); op_list_t::iterator scx_op_end = scx->scx_ops->end();
for (; linked_it != linked_end && scx_op_it != scx_op_end; for (; linked_it != linked_end && scx_op_it != scx_op_end;
++linked_it, ++scx_op_it) { ++linked_it, ++scx_op_it) {
DataRecord_t * r = *linked_it; DataRecord_t * r = *linked_it;
...@@ -278,44 +310,44 @@ bool LlxScx<UserData, ValuePool>::Help( ...@@ -278,44 +310,44 @@ bool LlxScx<UserData, ValuePool>::Help(
} else { } else {
if (r->ScxInfo().Load() != scx) { if (r->ScxInfo().Load() != scx) {
// could not freeze r because it is frozen for another SCX: // could not freeze r because it is frozen for another SCX:
if (scx->all_frozen_) { if (scx->all_frozen) {
// SCX already completed by any other thread: // SCX already completed by any other thread:
return true; return true;
} }
// Atomically unfreeze all nodes frozen for this SCX (see LLX): // Atomically unfreeze all nodes frozen for this SCX (see LLX):
scx->state_ = ScxRecord_t::Aborted; scx->state = ScxRecord_t::Aborted;
return false; return false;
} }
} }
} }
// finished freezing data records // finished freezing data records
assert(scx->state_ == ScxRecord_t::InProgress || assert(scx->state == ScxRecord_t::InProgress ||
scx->state_ == ScxRecord_t::Comitted); scx->state == ScxRecord_t::Comitted);
// frozen step: // frozen step:
scx->all_frozen_ = true; scx->all_frozen = true;
// mark step: // mark step:
dr_list_t::iterator finalize_it = scx->finalize_data_records_->begin(); dr_list_t::iterator finalize_it = scx->finalize_data_records->begin();
dr_list_t::iterator finalize_end = scx->finalize_data_records_->end(); dr_list_t::iterator finalize_end = scx->finalize_data_records->end();
for (; finalize_it != finalize_end; ++finalize_it) { for (; finalize_it != finalize_end; ++finalize_it) {
(*finalize_it)->MarkForFinalize(); (*finalize_it)->MarkForFinalize();
} }
// update CAS: // update CAS:
cas_t expected_old_value = scx->old_value_; cas_t expected_old_value = scx->old_value;
// scx->old_value_ is not an ABA hazard as it is local to the instance // scx->old_value_ is not an ABA hazard as it is local to the instance
// scx which is already guarded. // scx which is already guarded.
scx->field_->CompareAndSwap(expected_old_value, scx->new_value_); scx->field->CompareAndSwap(expected_old_value, scx->new_value);
// Commit step. // Commit step.
// Finalizes all r in data_records within finalize range and // Finalizes all r in data_records within finalize range and
// unfreezes all r in data_records outside of finalize range. // unfreezes all r in data_records outside of finalize range.
// Linearization point of this operation. // Linearization point of this operation.
scx->state_ = ScxRecord_t::Comitted; scx->state = ScxRecord_t::Comitted;
return true; return true;
} }
template< typename UserData, typename ValuePool > template< typename UserData, typename ValuePool >
void LlxScx<UserData, ValuePool>::DeleteOperationCallback( void LlxScx<UserData, ValuePool>::DeleteOperationCallback(
ScxRecord_t * scx_record) { ScxRecord_t * scx_record) {
scx_record_list_pool_.Free(scx_record->scx_ops_); scx_record_list_pool_.Free(scx_record->scx_ops);
scx_record_pool_.Free(scx_record); scx_record_pool_.Free(scx_record);
} }
......
...@@ -68,14 +68,14 @@ class ScxRecord { ...@@ -68,14 +68,14 @@ class ScxRecord {
* Default constructor, creates sentinel instance of ScxRecord. * Default constructor, creates sentinel instance of ScxRecord.
*/ */
ScxRecord() ScxRecord()
: linked_data_records_(0), : linked_data_records(0),
finalize_data_records_(0), finalize_data_records(0),
new_value_(0), new_value(0),
old_value_(0), old_value(0),
scx_ops_(0), scx_ops(0),
state_(OperationState::Comitted), state(OperationState::Comitted),
all_frozen_(false) { all_frozen(false) {
field_ = 0; field = 0;
} }
/** /**
...@@ -86,76 +86,72 @@ class ScxRecord { ...@@ -86,76 +86,72 @@ class ScxRecord {
linked_data_records, linked_data_records,
FixedSizeList<DataRecord *> & FixedSizeList<DataRecord *> &
finalize_data_records, finalize_data_records,
embb::base::Atomic<cas_t> * field, embb::base::Atomic<cas_t> * target_field,
cas_t new_value, cas_t new_value,
cas_t old_value, cas_t old_value,
FixedSizeList<self_t *> * scx_ops, FixedSizeList<self_t *> * scx_ops,
OperationState operation_state) OperationState operation_state)
: linked_data_records_(&linked_data_records), : linked_data_records(&linked_data_records),
finalize_data_records_(&finalize_data_records), finalize_data_records(&finalize_data_records),
new_value_(new_value), new_value(new_value),
old_value_(old_value), old_value(old_value),
scx_ops_(scx_ops), scx_ops(scx_ops),
state_(operation_state), state(operation_state),
all_frozen_(false) { all_frozen(false) {
field_ = field; field = target_field;
} }
OperationState State() const {
return state_;
}
public: public:
/** /**
* Sequence of load-linked data records for this SCX operation. * Sequence of load-linked data records for this SCX operation.
* Named 'V' in original publication. * Named 'V' in original publication.
*/ */
const FixedSizeList<DataRecord *> * const FixedSizeList<DataRecord *> *
linked_data_records_; linked_data_records;
/** /**
* Sequence of data records to be finalized in this SCX operation. * Sequence of data records to be finalized in this SCX operation.
* Named 'R' in original publication. * Named 'R' in original publication.
*/ */
const FixedSizeList<DataRecord *> * const FixedSizeList<DataRecord *> *
finalize_data_records_; finalize_data_records;
/** /**
* Pointer to a mutable field of a data record in data_records the * Pointer to a mutable field of a data record in data_records the
* new value is to be stored. * new value is to be stored.
* Named 'fld' in original publication. * Named 'fld' in original publication.
*/ */
embb::base::Atomic<cas_t> * field_; embb::base::Atomic<cas_t> * field;
/** /**
* Value to be written in field referenced by field_index. * Value to be written in field referenced by field_index.
* Required to be compatible with atomic operations. * Required to be compatible with atomic operations.
*/ */
cas_t new_value_; cas_t new_value;
/** /**
* Value previously read from field referenced by field_index. * Value previously read from field referenced by field_index.
* Required to be compatible with atomic operations. * Required to be compatible with atomic operations.
*/ */
cas_t old_value_; cas_t old_value;
/** /**
* List of SCX operation descriptions associated with data records * List of SCX operation descriptions associated with data records
* linked with this SCX operation. * linked with this SCX operation.
*/ */
FixedSizeList<self_t *> * scx_ops_; FixedSizeList<self_t *> * scx_ops;
/** /**
* Current state of this SCX record. * Current state of this SCX record.
*/ */
OperationState state_; OperationState state;
/** /**
* Whether all fields are currently frozen, initially false. * Whether all fields are currently frozen, initially false.
* Set to true after all data records in data_records V have * Set to true after all data records in data_records V have
* been frozen for the SCX. * been frozen for the SCX.
*/ */
bool all_frozen_; bool all_frozen;
}; /* class ScxRecord */ }; /* class ScxRecord */
...@@ -223,7 +219,7 @@ class LlxScxRecord { ...@@ -223,7 +219,7 @@ class LlxScxRecord {
UserData & Data() { UserData & Data() {
return user_data_; return user_data_;
} }
/*
UserData * operator*() { UserData * operator*() {
return &user_data_; return &user_data_;
} }
...@@ -231,7 +227,7 @@ class LlxScxRecord { ...@@ -231,7 +227,7 @@ class LlxScxRecord {
UserData * operator->() { UserData * operator->() {
return &user_data_; return &user_data_;
} }
*/
/** /**
* A data record r is frozen for an SCX-record U if r.info points to * A data record r is frozen for an SCX-record U if r.info points to
* U and either U.state is InProgress, or U.state is Committed and r * U and either U.state is InProgress, or U.state is Committed and r
...@@ -369,17 +365,17 @@ class LlxScx { ...@@ -369,17 +365,17 @@ class LlxScx {
); );
/** /**
* Performs a VLX (extended validate link) operation on given LLX data * Performs a VLX (extended validate link) operation on list of given LLX
* record. * data records.
* Before calling this method, the given LLX/SCX record must have been * Before calling this method, the given LLX/SCX records must have been
* linked via \c TryLoadLinked. * linked via \c TryLoadLinked.
* *
* \returns True if the calling thread's link obtained by its most recent * \returns True if the calling thread's links obtained by its most recent
* invocation of SCX is still valid. * invocations of SCX is still valid.
*/ */
bool TryValidateLink( bool TryValidateLink(
const DataRecord_t & data_record embb::containers::internal::FixedSizeList<DataRecord_t *> & linked_deps
/**< [IN] Linked data record to validate */ /**< [IN] Linked data records to validate */
); );
private: private:
......
...@@ -157,6 +157,8 @@ void LlxScxTest::SerialArrayTest() { ...@@ -157,6 +157,8 @@ void LlxScxTest::SerialArrayTest() {
links.PushBack(&r2); links.PushBack(&r2);
links.PushBack(&r3); links.PushBack(&r3);
PT_ASSERT(llxscx.TryValidateLink(links));
FixedSizeList< LlxScxRecord<Payload> * > FixedSizeList< LlxScxRecord<Payload> * >
finalize_links(1); finalize_links(1);
finalize_links.PushBack(&r3); finalize_links.PushBack(&r3);
...@@ -169,6 +171,8 @@ void LlxScxTest::SerialArrayTest() { ...@@ -169,6 +171,8 @@ void LlxScxTest::SerialArrayTest() {
finalize_links)); finalize_links));
// New value should have been changed successfully: // New value should have been changed successfully:
PT_ASSERT_EQ(field.Load(), a); PT_ASSERT_EQ(field.Load(), a);
PT_ASSERT(!llxscx.TryValidateLink(links));
} }
} // namespace test } // namespace test
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment