Commit 92213745 by Tobias Fuchs

containers_cpp: added comments on implementation of LlxScx

parent 3cf586b9
...@@ -99,15 +99,23 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked( ...@@ -99,15 +99,23 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked(
// Order of initialization matters: // Order of initialization matters:
volatile bool marked_1 = data_record->IsMarkedForFinalize(); volatile bool marked_1 = data_record->IsMarkedForFinalize();
ScxRecord_t * curr_scx = data_record->ScxInfo().Load(); ScxRecord_t * curr_scx = data_record->ScxInfo().Load();
// Guard active SCX record of this data record using guard 1.
// When calling help with this SCX record, it will be guarded
// using guard 0 again.
// This hazard pointer is validated in the nested if-block below.
hp.GuardPointer(1, curr_scx);
volatile OperationState curr_state = curr_scx->State(); volatile OperationState curr_state = curr_scx->State();
bool marked_2 = data_record->IsMarkedForFinalize(); bool marked_2 = data_record->IsMarkedForFinalize();
if (curr_state == OperationState::Aborted || if (curr_state == OperationState::Aborted ||
(curr_state == OperationState::Comitted && !marked_2)) { (curr_state == OperationState::Comitted && !marked_2)) {
// read mutable fields into local variable: // read mutable fields into local variable:
UserData user_data_local(data_record->Data()); UserData user_data_local(data_record->Data());
if (data_record->ScxInfo().Load() == curr_scx) { if (data_record->ScxInfo().Load() == curr_scx) {
// store <r, curr_scx, user_data_local> in // Active SCX record of data record did not change.
// the thread-specific table: // Store <r, curr_scx, user_data_local> in
// the thread-specific table.
// LLX results do not need to be guarded as they local to the
// thread.
LlxResult llx_result; LlxResult llx_result;
llx_result.data_record = data_record; llx_result.data_record = data_record;
llx_result.scx_record = curr_scx; llx_result.scx_record = curr_scx;
...@@ -118,16 +126,20 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked( ...@@ -118,16 +126,20 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked(
return true; return true;
} }
} }
// Active SCX record of data record has been changed in between
if (marked_1 && if (marked_1 &&
(curr_scx->State() == OperationState::Comitted || (curr_scx->State() == OperationState::Comitted ||
(curr_scx->State() == OperationState::InProgress && Help(curr_scx)))) { (curr_scx->State() == OperationState::InProgress && Help(curr_scx)))) {
// Successfully completed active SCX: // Successfully completed the data record's active SCX but failed to
// complete the LLX operation because the data record has been finalized:
finalized = true; finalized = true;
return false; return false;
} }
if (data_record->ScxInfo().Load()->State() == OperationState::InProgress) { if (data_record->ScxInfo().Load()->State() == OperationState::InProgress) {
// Help active SCX: // Help active SCX.
Help(data_record->ScxInfo().Load()); // This SCX record has been guarded above.
ScxRecord_t * data_record_scx = data_record->ScxInfo().Load();
Help(data_record_scx);
} }
return false; return false;
} }
...@@ -157,6 +169,7 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional( ...@@ -157,6 +169,7 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
// Let info_fields be a table in shared memory containing for each r in V, // Let info_fields be a table in shared memory containing for each r in V,
// a copy of r's info value in this threads local table of LLX results. // a copy of r's info value in this threads local table of LLX results.
// Will be freed in Help() once the SCX operation has been completed. // Will be freed in Help() once the SCX operation has been completed.
// In brief: A list of the SCX record of all linked deps.
scx_op_list_t * info_fields = scx_record_list_pool_.Allocate(max_links_); scx_op_list_t * info_fields = scx_record_list_pool_.Allocate(max_links_);
if (info_fields == NULL) { if (info_fields == NULL) {
EMBB_THROW(embb::base::ErrorException, EMBB_THROW(embb::base::ErrorException,
...@@ -173,7 +186,7 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional( ...@@ -173,7 +186,7 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
llx_result_list::iterator llx_result_it; llx_result_list::iterator llx_result_it;
llx_result_list::iterator llx_result_end; llx_result_list::iterator llx_result_end;
llx_result_end = thread_llx_results_[thread_id]->end(); llx_result_end = thread_llx_results_[thread_id]->end();
// Find LLX result of r in thread-local LLX results: // Find LLX result of data_record (r) in thread-local LLX results:
for (llx_result_it = thread_llx_results_[thread_id]->begin(); for (llx_result_it = thread_llx_results_[thread_id]->begin();
llx_result_it != llx_result_end && llx_result_it != llx_result_end &&
llx_result_it->data_record != *it; llx_result_it->data_record != *it;
...@@ -193,6 +206,9 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional( ...@@ -193,6 +206,9 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
// Announce SCX operation. Lists linked_deps and finalize_dep are // Announce SCX operation. Lists linked_deps and finalize_dep are
// guaranteed to remain on the stack until this announced operation // guaranteed to remain on the stack until this announced operation
// is completed, so no allocation/pool is necessary. // is completed, so no allocation/pool is necessary.
// The SCX operation description must be allocated from a pool as
// LLX data records might reference it after this operation has been
// completed.
ScxRecord_t new_scx( ScxRecord_t new_scx(
linked_deps, linked_deps,
finalize_deps, finalize_deps,
...@@ -202,17 +218,21 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional( ...@@ -202,17 +218,21 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
cas_value, cas_value,
// old value: // old value:
cas_field->Load(), cas_field->Load(),
// linked SCX operations: // list of the SCX record of all linked deps
info_fields, info_fields,
// initial operation state: // initial operation state:
OperationState::InProgress); OperationState::InProgress);
// Allocate from pool as this operation description is global: // Allocate from pool as this operation description is global:
ScxRecord_t * scx = scx_record_pool_.Allocate(new_scx); ScxRecord_t * scx = scx_record_pool_.Allocate(new_scx);
// Try to complete the operation. It will also be helped in failing
// TryLoadLinked operations.
bool result = Help(scx); bool result = Help(scx);
hp.GuardPointer(0, NULL); // Release all load-links this SCX operation depended on
hp.GuardPointer(1, NULL);
ClearLinks(); ClearLinks();
hp.EnqueuePointerForDeletion(scx); // Release guards, but do not enqueue instance scx for deletion as it
// is still referenced in data records.
hp.GuardPointer(0, NULL);
hp.GuardPointer(1, NULL);;
return result; return result;
} }
...@@ -247,7 +267,9 @@ bool LlxScx<UserData, ValuePool>::Help( ...@@ -247,7 +267,9 @@ bool LlxScx<UserData, ValuePool>::Help(
ScxRecord<DataRecord_t> * rinfo_old = *scx_op_it; ScxRecord<DataRecord_t> * rinfo_old = *scx_op_it;
hp.GuardPointer(1, rinfo_old); hp.GuardPointer(1, rinfo_old);
// Try to freeze the data record by setting its SCX info field // Try to freeze the data record by setting its SCX info field
// to this SCX operation description: // to this SCX operation description.
// r->ScxInfo() is not an ABA hazard as it is local to instance scx
// which is already guarded.
if (r->ScxInfo().CompareAndSwap(rinfo_old, scx)) { if (r->ScxInfo().CompareAndSwap(rinfo_old, scx)) {
// Do not try to delete the sentinel scx record: // Do not try to delete the sentinel scx record:
if (rinfo_old != &DataRecord_t::DummyScx) { if (rinfo_old != &DataRecord_t::DummyScx) {
...@@ -279,6 +301,8 @@ bool LlxScx<UserData, ValuePool>::Help( ...@@ -279,6 +301,8 @@ bool LlxScx<UserData, ValuePool>::Help(
} }
// update CAS: // update CAS:
cas_t expected_old_value = scx->old_value_; cas_t expected_old_value = scx->old_value_;
// scx->old_value_ is not an ABA hazard as it is local to the instance
// scx which is already guarded.
scx->field_->CompareAndSwap(expected_old_value, scx->new_value_); scx->field_->CompareAndSwap(expected_old_value, scx->new_value_);
// Commit step. // Commit step.
// Finalizes all r in data_records within finalize range and // Finalizes all r in data_records within finalize range and
......
...@@ -63,28 +63,30 @@ void LlxScxTest::ParallelTest() { ...@@ -63,28 +63,30 @@ void LlxScxTest::ParallelTest() {
int return_val = embb_internal_thread_index(&thread_index); int return_val = embb_internal_thread_index(&thread_index);
if (return_val != EMBB_SUCCESS) if (return_val != EMBB_SUCCESS)
EMBB_THROW(embb::base::ErrorException, "Could not get thread id!"); EMBB_THROW(embb::base::ErrorException, "Could not get thread id!");
// Threads try to append n nodes to a linked list in parallel // Every thread adds every character from 'a' to 'z' into an ordered
for (char value = 'a'; value <= 'p';) { // linked list
for (char value = 'a'; value <= 'z';) {
// Find node to append new element on: // Find node to append new element on:
internal::LlxScxRecord<Node> * node = &head_llx; internal::LlxScxRecord<Node> * node_rec = &head_llx;
internal::LlxScxRecord<Node> * next = node->Data().next_; internal::LlxScxRecord<Node> * next_rec = node_rec->Data().next_;
while (next != 0 && next->Data().value_ < value) { while (next_rec != 0 && next_rec->Data().value_ < value) {
node = next; node_rec = next_rec;
next = next->Data().next_; next_rec = next_rec->Data().next_;
} }
Node n; Node node;
bool finalized; bool finalized;
// LLX on node the new element will be appended to: // LLX on node the new element will be appended to:
if (!llxscx_.TryLoadLinked(node, n, finalized)) { if (!llxscx_.TryLoadLinked(node_rec, node, finalized)) {
continue; continue;
} }
if (n.next_ == next) { PT_ASSERT_MSG(!finalized, "No node should be finalized");
if (node.next_ == next_rec) {
// Pointer still valid after LLX, try to append new node // Pointer still valid after LLX, try to append new node
internal::FixedSizeList<LlxScxRecord<Node> *> linked_deps(1); internal::FixedSizeList<LlxScxRecord<Node> *> linked_deps(1);
internal::FixedSizeList<LlxScxRecord<Node> *> finalize_deps(0); internal::FixedSizeList<LlxScxRecord<Node> *> finalize_deps(0);
linked_deps.PushBack(node); linked_deps.PushBack(node_rec);
// Create new node: // Create new node:
Node new_node(static_cast<int>(thread_index), value, next); Node new_node(static_cast<int>(thread_index), value, next_rec);
internal::LlxScxRecord<Node> * new_node_ptr = internal::LlxScxRecord<Node> * new_node_ptr =
new internal::LlxScxRecord<Node>(new_node); new internal::LlxScxRecord<Node>(new_node);
// Convert node pointer to size_t: // Convert node pointer to size_t:
...@@ -92,7 +94,7 @@ void LlxScxTest::ParallelTest() { ...@@ -92,7 +94,7 @@ void LlxScxTest::ParallelTest() {
// Convert target field pointer to size_t*: // Convert target field pointer to size_t*:
embb::base::Atomic<size_t> * field_cas_ptr = embb::base::Atomic<size_t> * field_cas_ptr =
reinterpret_cast< embb::base::Atomic<size_t> * >( reinterpret_cast< embb::base::Atomic<size_t> * >(
&(node->Data().next_)); &(node_rec->Data().next_));
// Call SCX: // Call SCX:
bool element_inserted = bool element_inserted =
llxscx_.TryStoreConditional( llxscx_.TryStoreConditional(
...@@ -119,7 +121,9 @@ void LlxScxTest::ParallelTestPost() { ...@@ -119,7 +121,9 @@ void LlxScxTest::ParallelTestPost() {
node = next; node = next;
next = next->Data().next_; next = next->Data().next_;
} }
PT_ASSERT_EQ_MSG(static_cast<size_t>(16 * num_threads_), values.size(), // Check if every character from 'a' to 'z' has been added to the
// linked list in the correct order
PT_ASSERT_EQ_MSG(static_cast<size_t>(26 * num_threads_), values.size(),
"Unexpected size of result list"); "Unexpected size of result list");
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment