Commit 746bd2fc by Tobias Fuchs

containers_cpp: fixes of reclamation in LlxScx

parent fb4b4604
......@@ -111,14 +111,14 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked(
}
if (marked_1 &&
(curr_scx->State() == OperationState::Comitted ||
(curr_scx->State() == OperationState::InProgress && curr_scx->Help()))) {
(curr_scx->State() == OperationState::InProgress && Help(curr_scx)))) {
// Successfully completed active SCX:
finalized = true;
return false;
}
if (data_record->ScxInfo().Load()->State() == OperationState::InProgress) {
// Help active SCX:
data_record->ScxInfo().Load()->Help();
Help(data_record->ScxInfo().Load());
}
return false;
}
......@@ -212,10 +212,10 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditionalCAS(
EMBB_THROW(embb::base::ErrorException,
"Missing preceding LLX on a data record used for SCX");
}
// ScxRecord_t scx_op(*(l_it->data_record->ScxInfo().Load()));
info_fields->PushBack(
l_it->data_record->ScxInfo().Load());
}
thread_llx_results_[thread_id]->clear();
// Announce SCX operation. Lists linked_deps and finalize_dep are
// guaranteed to remain on the stack until this announced operation
// is completed, so no allocation/pool is necessary.
......@@ -234,7 +234,7 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditionalCAS(
OperationState::InProgress);
// Allocate from pool as this operation description is global:
ScxRecord_t * scx = scx_record_pool_.Allocate(new_scx);
return scx->Help();
return Help(scx);
}
template< typename UserData, typename ValuePool >
......@@ -243,82 +243,87 @@ bool LlxScx<UserData, ValuePool>::TryValidateLink(
return true; // @TODO
}
// LlxScxRecord
// ScxRecord
template< typename UserData >
LlxScxRecord<UserData>::LlxScxRecord()
: marked_for_finalize_(false) {
scx_op_.Store(&dummy_scx);
}
template< typename UserData >
LlxScxRecord<UserData>::LlxScxRecord(
const UserData & user_data)
: user_data_(user_data),
marked_for_finalize_(false) {
scx_op_.Store(&dummy_scx);
}
// internal::ScxRecord
template< typename DataRecord >
bool ScxRecord<DataRecord>::Help() {
template< typename UserData, typename ValuePool >
bool LlxScx<UserData, ValuePool>::Help(
ScxRecord_t * scx) {
// We ensure that an SCX S does not change a data record
// while it is frozen for another SCX S'. Instead, S uses
// the information in the SCX record of S' to help S'
// complete, so that the data record can be unfrozen.
typedef embb::containers::internal::FixedSizeList<DataRecord *> dr_list_t;
typedef embb::containers::internal::FixedSizeList<self_t *> op_list_t;
typedef embb::containers::internal::FixedSizeList<DataRecord_t *> dr_list_t;
typedef embb::containers::internal::FixedSizeList<ScxRecord_t *> op_list_t;
// Freeze all data records in data_records (i.e. reserve them for this
// SCX operation) to protect their mutable fields from being changed by
// other SCXs:
dr_list_t::iterator linked_it = linked_data_records_->begin();
dr_list_t::iterator linked_end = linked_data_records_->end();
op_list_t::iterator scx_op_it = scx_ops_->begin();
op_list_t::iterator scx_op_end = scx_ops_->end();
dr_list_t::iterator linked_it = scx->linked_data_records_->begin();
dr_list_t::iterator linked_end = scx->linked_data_records_->end();
op_list_t::iterator scx_op_it = scx->scx_ops_->begin();
op_list_t::iterator scx_op_end = scx->scx_ops_->end();
for (; linked_it != linked_end && scx_op_it != scx_op_end;
++linked_it, ++scx_op_it) {
DataRecord * r = *linked_it;
ScxRecord<DataRecord> * rinfo_old = *scx_op_it;
DataRecord_t * r = *linked_it;
ScxRecord<DataRecord_t> * rinfo_old = *scx_op_it;
// Try to freeze the data record by setting its SCX info field
// to this SCX operation description:
if (!r->ScxInfo().CompareAndSwap(rinfo_old, this)) {
if (r->ScxInfo().Load() != this) {
if (!r->ScxInfo().CompareAndSwap(rinfo_old, scx)) {
if (r->ScxInfo().Load() != scx) {
// could not freeze r because it is frozen for another SCX:
if (all_frozen_) {
if (scx->all_frozen_) {
// SCX already completed:
return true;
}
// Atomically unfreeze all nodes frozen for this SCX (see LLX):
state_ = Aborted;
scx->state_ = ScxRecord_t::Aborted;
return false;
}
} else {
// Do not try to delete the sentinel scx record:
if (rinfo_old->field_ != 0) {
scx_record_list_pool_.Free(rinfo_old->scx_ops_);
scx_record_pool_.Free(rinfo_old);
}
else {
// free_scx_ops.PushBack(rinfo_old);
}
}
// finished freezing data records
assert(state_ == InProgress || state_ == Comitted);
assert(scx->state_ == ScxRecord_t::InProgress ||
scx->state_ == ScxRecord_t::Comitted);
// frozen step:
all_frozen_ = true;
scx->all_frozen_ = true;
// mark step:
dr_list_t::iterator finalize_it = finalize_data_records_->begin();
dr_list_t::iterator finalize_end = finalize_data_records_->end();
dr_list_t::iterator finalize_it = scx->finalize_data_records_->begin();
dr_list_t::iterator finalize_end = scx->finalize_data_records_->end();
for (; finalize_it != finalize_end; ++finalize_it) {
(*finalize_it)->MarkForFinalize();
}
// update CAS:
cas_t expected_old_value = old_value_;
field_->CompareAndSwap(expected_old_value, new_value_);
cas_t expected_old_value = scx->old_value_;
scx->field_->CompareAndSwap(expected_old_value, scx->new_value_);
// Commit step.
// Finalizes all r in data_records within finalize range and
// unfreezes all r in data_records outside of finalize range.
// Linearization point of this operation.
state_ = Comitted;
scx->state_ = ScxRecord_t::Comitted;
return true;
}
// LlxScxRecord
template< typename UserData >
LlxScxRecord<UserData>::LlxScxRecord()
: marked_for_finalize_(false) {
scx_op_.Store(&dummy_scx);
}
template< typename UserData >
LlxScxRecord<UserData>::LlxScxRecord(
const UserData & user_data)
: user_data_(user_data),
marked_for_finalize_(false) {
scx_op_.Store(&dummy_scx);
}
template< typename UserData >
ScxRecord< LlxScxRecord<UserData> >
LlxScxRecord<UserData>::dummy_scx =
......
......@@ -29,6 +29,7 @@
#include <embb/base/thread.h>
#include <embb/base/atomic.h>
#include <embb/base/function.h>
#include <embb/base/thread_specific_storage.h>
#include <embb/containers/object_pool.h>
#include <embb/containers/lock_free_tree_value_pool.h>
......@@ -245,14 +246,14 @@ class ScxRecord {
* Constructor.
*/
ScxRecord(
embb::containers::internal::FixedSizeList<DataRecord *> &
FixedSizeList<DataRecord *> &
linked_data_records,
embb::containers::internal::FixedSizeList<DataRecord *> &
FixedSizeList<DataRecord *> &
finalize_data_records,
embb::base::Atomic<cas_t> * field,
cas_t new_value,
cas_t old_value,
embb::containers::internal::FixedSizeList<self_t *> * scx_ops,
FixedSizeList<self_t *> * scx_ops,
OperationState operation_state)
: linked_data_records_(&linked_data_records),
finalize_data_records_(&finalize_data_records),
......@@ -268,24 +269,19 @@ class ScxRecord {
return state_;
}
/**
* Returns true if helped operation has been completed.
*/
bool Help();
private:
public:
/**
* Sequence of load-linked data records for this SCX operation.
* Named 'V' in original publication.
*/
const embb::containers::internal::FixedSizeList<DataRecord *> *
const FixedSizeList<DataRecord *> *
linked_data_records_;
/**
* Sequence of data records to be finalized in this SCX operation.
* Named 'R' in original publication.
*/
const embb::containers::internal::FixedSizeList<DataRecord *> *
const FixedSizeList<DataRecord *> *
finalize_data_records_;
/**
......@@ -311,7 +307,7 @@ class ScxRecord {
* List of SCX operation descriptions associated with data records
* linked with this SCX operation.
*/
embb::containers::internal::FixedSizeList<self_t *> * scx_ops_;
FixedSizeList<self_t *> * scx_ops_;
/**
* Current state of this SCX record.
......@@ -649,6 +645,8 @@ class LlxScx {
*/
LlxScx & operator=(const LlxScx &);
bool Help(ScxRecord_t * scx);
/**
* Actual implementation of StoreConditional operating on unified fields/values
* of type cas_t.
......
......@@ -41,21 +41,62 @@ LlxScxTest::LlxScxTest() :
static_cast<int>(partest::TestSuite::GetDefaultNumThreads())),
llxscx_(3),
tail(0, '-'),
head(0, '-', Node::node_ptr_t(&tail)) {
head(0, '-'),
tail_llx(tail),
head_llx(head) {
CreateUnit("SerialArrayTest").Add(&LlxScxTest::SerialArrayTest, this);
CreateUnit("SerialListTest").Add(&LlxScxTest::SerialListTest, this);
CreateUnit("ParallelTest").Add(&LlxScxTest::ParallelTest, this);
// CreateUnit("ParallelTest")
// .Add(&LlxScxTest::ParallelTest, this)
// .Post(&LlxScxTest::ParallelTestPost, this);
}
void LlxScxTest::ParallelTest() {
typedef LlxScxTest::Node Node;
unsigned int thread_index;
int return_val = embb_internal_thread_index(&thread_index);
if (return_val != EMBB_SUCCESS)
EMBB_THROW(embb::base::ErrorException, "Could not get thread id!");
// Threads try to append n nodes to a linked list in parallel
for (char value = 'a'; value <= 'z';) {
// Find node to append new element on:
internal::LlxScxRecord<Node> * node = &head_llx;
internal::LlxScxRecord<Node> * next = node->Data().next_;
while (next != 0 && next->Data().value_ < value) {
node = next;
next = next->Data().next_;
}
Node n;
llxscx_.TryLoadLinked(node, n);
if (n.next_ == next) {
// Pointer still valid after LLX, call SCX(node, node.next, new_node)
internal::FixedSizeList<LlxScxRecord<Node> *> linked_deps(1);
linked_deps.PushBack(node);
// Create new node:
Node new_node(static_cast<int>(thread_index), value);
internal::LlxScxRecord<Node> * new_node_ptr =
new internal::LlxScxRecord<Node>(new_node);
bool element_inserted =
llxscx_.TryStoreConditional(
&(node->Data().next_),
new_node_ptr,
linked_deps);
if (element_inserted) {
// Value has been added to list, continue with next value
++value;
}
}
}
}
void LlxScxTest::ParallelTestPost() {
internal::LlxScxRecord<Node> * node = &head_llx;
internal::LlxScxRecord<Node> * next = head_llx.Data().next_;
while (next != 0) {
delete node;
node = next;
next = next->Data().next_;
}
}
void LlxScxTest::SerialArrayTest() {
......@@ -67,7 +108,7 @@ void LlxScxTest::SerialArrayTest() {
// Atomic<size_t> not assignable, TryStoreConditional requires
// a specialization for atomics that uses a.Store(b.Load()).
AtomicField field(23);
// Initialize
LlxScxRecord< Payload > * my_list =
new LlxScxRecord<Payload>[10];
for (int i = 0; i != 10; ++i) {
......
......@@ -91,11 +91,14 @@ class LlxScxTest : public partest::TestCase {
void SerialArrayTest();
void SerialListTest();
void ParallelTest();
void ParallelTestPost();
int num_threads_;
internal::LlxScx<Node> llxscx_;
Node tail;
Node head;
internal::LlxScxRecord<Node> tail_llx;
internal::LlxScxRecord<Node> head_llx;
};
} // namespace test
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment