Commit 95511ee9 by Tobias Fuchs

containers_cpp: fixed allocation in LLX/SCX, cleanup, extending unit tests

parent a3c62f0c
...@@ -49,7 +49,7 @@ unsigned int LlxScx<UserData, ValuePool>::ThreadId() { ...@@ -49,7 +49,7 @@ unsigned int LlxScx<UserData, ValuePool>::ThreadId() {
unsigned int thread_index; unsigned int thread_index;
int return_val = embb_internal_thread_index(&thread_index); int return_val = embb_internal_thread_index(&thread_index);
if (return_val != EMBB_SUCCESS) if (return_val != EMBB_SUCCESS)
EMBB_THROW(embb::base::ErrorException, "Could not get thread id!"); EMBB_THROW(embb::base::ErrorException, "Could not get thread id");
return thread_index; return thread_index;
} }
...@@ -57,19 +57,32 @@ template< typename UserData, typename ValuePool > ...@@ -57,19 +57,32 @@ template< typename UserData, typename ValuePool >
LlxScx<UserData, ValuePool>::LlxScx(size_t max_links) LlxScx<UserData, ValuePool>::LlxScx(size_t max_links)
: max_links_(max_links), : max_links_(max_links),
max_threads_(embb::base::Thread::GetThreadsMaxCount()), max_threads_(embb::base::Thread::GetThreadsMaxCount()),
scx_record_list_pool_(max_threads_) { scx_record_list_pool_(max_threads_),
scx_record_pool_(max_threads_) {
typedef embb::containers::internal::FixedSizeList<LlxResult> typedef embb::containers::internal::FixedSizeList<LlxResult>
llx_result_list_t; llx_result_list_t;
typedef embb::containers::internal::FixedSizeList<ScxRecord_t> typedef embb::containers::internal::FixedSizeList<ScxRecord_t>
scx_record_list_t; scx_record_list_t;
// Allocate a list of LLX results for every thread: // Allocate a list of LLX results for every thread:
thread_llx_results_ = static_cast<llx_result_list_t *>( thread_llx_results_ = static_cast<llx_result_list_t **>(
embb::base::Allocation::AllocateCacheAligned( embb::base::Allocation::AllocateCacheAligned(
max_threads_ * sizeof(llx_result_list_t))); max_threads_ * sizeof(llx_result_list_t*)));
// Using Allocation::New to create every list instance as FixedSizeList
// does not provide a default constructor and Allocation::Allocate does
// not allow constructor arguments.
for (unsigned int thread_idx = 0; thread_idx < max_threads_; ++thread_idx) {
thread_llx_results_[thread_idx] =
embb::base::Allocation::New<llx_result_list_t>(max_links_);
}
} }
template< typename UserData, typename ValuePool > template< typename UserData, typename ValuePool >
LlxScx<UserData, ValuePool>::~LlxScx() { LlxScx<UserData, ValuePool>::~LlxScx() {
// Delete thread-specific lists of LLX results:
for (unsigned int thread_idx = 0; thread_idx < max_threads_; ++thread_idx) {
embb::base::Allocation::Delete(thread_llx_results_[thread_idx]);
}
// Delete array of list pointers:
embb::base::Allocation::FreeAligned(thread_llx_results_); embb::base::Allocation::FreeAligned(thread_llx_results_);
} }
...@@ -89,31 +102,29 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked( ...@@ -89,31 +102,29 @@ bool LlxScx<UserData, ValuePool>::TryLoadLinked(
(curr_state == OperationState::Comitted && !marked_2)) { (curr_state == OperationState::Comitted && !marked_2)) {
// read mutable fields into local variable: // read mutable fields into local variable:
UserData user_data_local(data_record->Data()); UserData user_data_local(data_record->Data());
if (data_record->ScxInfo() == curr_scx) { if (data_record->ScxInfo().Load() == curr_scx) {
// store <r, curr_scx, user_data_local> in // store <r, curr_scx, user_data_local> in
// the thread-specific table: // the thread-specific table:
LlxResult llx_result; LlxResult llx_result;
llx_result.data_record = data_record; llx_result.data_record = data_record;
llx_result.scx_record = curr_scx; llx_result.scx_record = curr_scx;
llx_result.user_data = user_data_local; llx_result.user_data = user_data_local;
thread_llx_results_[thread_id].PushBack(llx_result); thread_llx_results_[thread_id]->PushBack(llx_result);
// Set return value: // Set return value:
user_data = user_data_local; user_data = user_data_local;
return true; return true;
} }
} }
// @TODO: Re-check if logical precedence is okay here (see paper): if (marked_1 &&
if (curr_scx->State() == OperationState::Comitted || (curr_scx->State() == OperationState::Comitted ||
(curr_scx->State() == OperationState::InProgress && (curr_scx->State() == OperationState::InProgress && curr_scx->Help()))) {
curr_scx->Help() &&
marked_1)) {
// Successfully completed active SCX: // Successfully completed active SCX:
finalized = true; finalized = true;
return false; return false;
} }
if (data_record->ScxInfo()->State() == OperationState::InProgress) { if (data_record->ScxInfo().Load()->State() == OperationState::InProgress) {
// Help active SCX: // Help active SCX:
data_record->ScxInfo()->Help(); data_record->ScxInfo().Load()->Help();
} }
return false; return false;
} }
...@@ -126,7 +137,7 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional( ...@@ -126,7 +137,7 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
embb::containers::internal::FixedSizeList<DataRecord_t *> & linked_deps, embb::containers::internal::FixedSizeList<DataRecord_t *> & linked_deps,
embb::containers::internal::FixedSizeList<DataRecord_t *> & finalize_deps) { embb::containers::internal::FixedSizeList<DataRecord_t *> & finalize_deps) {
typedef embb::containers::internal::FixedSizeList<DataRecord_t *> dr_list_t; typedef embb::containers::internal::FixedSizeList<DataRecord_t *> dr_list_t;
typedef embb::containers::internal::FixedSizeList<ScxRecord_t> scx_op_list_t; typedef embb::containers::internal::FixedSizeList<ScxRecord_t *> scx_op_list_t;
// Preconditions: // Preconditions:
// 1. For each r in linked_deps, this thread has performed an invocation // 1. For each r in linked_deps, this thread has performed an invocation
// I_r of LLX(r) linked to this SCX. // I_r of LLX(r) linked to this SCX.
...@@ -142,11 +153,10 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional( ...@@ -142,11 +153,10 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
end = linked_deps.end(); end = linked_deps.end();
// for each r in linked_deps ... // for each r in linked_deps ...
for (it = linked_deps.begin(); it != end; ++it) { for (it = linked_deps.begin(); it != end; ++it) {
// Find LLX result of r in thread-local table of LLX results:
typedef embb::containers::internal::FixedSizeList<LlxResult> typedef embb::containers::internal::FixedSizeList<LlxResult>
llx_result_list; llx_result_list;
llx_result_list::iterator l_it = thread_llx_results_[thread_id].begin(); llx_result_list::iterator l_it = thread_llx_results_[thread_id]->begin();
llx_result_list::iterator l_end = thread_llx_results_[thread_id].end(); llx_result_list::iterator l_end = thread_llx_results_[thread_id]->end();
// Find LLX result of r in thread-local LLX results: // Find LLX result of r in thread-local LLX results:
for (; l_it != l_end && l_it->data_record != *it; ++l_it); for (; l_it != l_end && l_it->data_record != *it; ++l_it);
if (l_it == l_end) { if (l_it == l_end) {
...@@ -155,14 +165,14 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional( ...@@ -155,14 +165,14 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
EMBB_THROW(embb::base::ErrorException, EMBB_THROW(embb::base::ErrorException,
"Missing preceding LLX on a data record used for SCX"); "Missing preceding LLX on a data record used for SCX");
} }
// Copy of r's info value in this threads local table of LLX results // ScxRecord_t scx_op(*(l_it->data_record->ScxInfo().Load()));
ScxRecord_t scx_op(*(l_it->data_record->ScxInfo().Load())); info_fields->PushBack(
info_fields->PushBack(scx_op); l_it->data_record->ScxInfo().Load());
} }
// Announce SCX operation. Lists linked_deps and finalize_dep are // Announce SCX operation. Lists linked_deps and finalize_dep are
// guaranteed to remain on the stack until this announced operation // guaranteed to remain on the stack until this announced operation
// is completed, so no allocation/pool is necessary. // is completed, so no allocation/pool is necessary.
ScxRecord_t scx( ScxRecord_t new_scx(
linked_deps, linked_deps,
finalize_deps, finalize_deps,
// target field: // target field:
...@@ -175,7 +185,9 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional( ...@@ -175,7 +185,9 @@ bool LlxScx<UserData, ValuePool>::TryStoreConditional(
info_fields, info_fields,
// initial operation state: // initial operation state:
OperationState::InProgress); OperationState::InProgress);
return scx.Help(); // Allocate from pool as this operation description is global:
ScxRecord_t * scx = scx_record_pool_.Allocate(new_scx);
return scx->Help();
} }
template< typename UserData, typename ValuePool > template< typename UserData, typename ValuePool >
...@@ -209,9 +221,10 @@ bool internal::ScxRecord<DataRecord>::Help() { ...@@ -209,9 +221,10 @@ bool internal::ScxRecord<DataRecord>::Help() {
// the information in the SCX record of S' to help S' // the information in the SCX record of S' to help S'
// complete, so that the data record can be unfrozen. // complete, so that the data record can be unfrozen.
typedef embb::containers::internal::FixedSizeList<DataRecord *> dr_list_t; typedef embb::containers::internal::FixedSizeList<DataRecord *> dr_list_t;
typedef embb::containers::internal::FixedSizeList<self_t> op_list_t; typedef embb::containers::internal::FixedSizeList<self_t *> op_list_t;
// Freeze all data records in data_records to protect their // Freeze all data records in data_records (i.e. reserve them for this
// mutable fields from being changed by other SCXs: // SCX operation) to protect their mutable fields from being changed by
// other SCXs:
dr_list_t::iterator linked_it = linked_data_records_->begin(); dr_list_t::iterator linked_it = linked_data_records_->begin();
dr_list_t::iterator linked_end = linked_data_records_->end(); dr_list_t::iterator linked_end = linked_data_records_->end();
op_list_t::iterator scx_op_it = scx_ops_->begin(); op_list_t::iterator scx_op_it = scx_ops_->begin();
...@@ -219,21 +232,24 @@ bool internal::ScxRecord<DataRecord>::Help() { ...@@ -219,21 +232,24 @@ bool internal::ScxRecord<DataRecord>::Help() {
for (; linked_it != linked_end && scx_op_it != scx_op_end; for (; linked_it != linked_end && scx_op_it != scx_op_end;
++linked_it, ++scx_op_it) { ++linked_it, ++scx_op_it) {
DataRecord * r = *linked_it; DataRecord * r = *linked_it;
// pointer indexed by r in this->info_fields: ScxRecord<DataRecord> * rinfo_old = *scx_op_it;
ScxRecord<DataRecord> * rinfo_exp = &(*scx_op_it); // Try to freeze the data record by setting its SCX info field
if (!r->ScxInfo().CompareAndSwap(rinfo_exp, this)) { // to this SCX operation description:
if (!r->ScxInfo().CompareAndSwap(rinfo_old, this)) {
if (r->ScxInfo().Load() != this) { if (r->ScxInfo().Load() != this) {
// could not freeze r because it is frozen for // could not freeze r because it is frozen for another SCX:
// another SCX:
if (all_frozen_) { if (all_frozen_) {
// SCX already completed: // SCX already completed:
return true; return true;
} }
// atomically unfreeze all nodes frozen for this SCX: // Atomically unfreeze all nodes frozen for this SCX (see LLX):
state_ = Aborted; state_ = Aborted;
return false; return false;
} }
} }
else {
// free_scx_ops.PushBack(rinfo_old);
}
} }
// finished freezing data records // finished freezing data records
assert(state_ == InProgress || state_ == Comitted); assert(state_ == InProgress || state_ == Comitted);
......
...@@ -81,12 +81,14 @@ class ScxRecord { ...@@ -81,12 +81,14 @@ class ScxRecord {
* Constructor. * Constructor.
*/ */
ScxRecord( ScxRecord(
embb::containers::internal::FixedSizeList<DataRecord *> & linked_data_records, embb::containers::internal::FixedSizeList<DataRecord *> &
embb::containers::internal::FixedSizeList<DataRecord *> & finalize_data_records, linked_data_records,
embb::containers::internal::FixedSizeList<DataRecord *> &
finalize_data_records,
embb::base::Atomic<cas_t> * field, embb::base::Atomic<cas_t> * field,
cas_t new_value, cas_t new_value,
cas_t old_value, cas_t old_value,
embb::containers::internal::FixedSizeList<self_t> * scx_ops, embb::containers::internal::FixedSizeList<self_t *> * scx_ops,
OperationState operation_state) OperationState operation_state)
: linked_data_records_(&linked_data_records), : linked_data_records_(&linked_data_records),
finalize_data_records_(&finalize_data_records), finalize_data_records_(&finalize_data_records),
...@@ -145,7 +147,7 @@ class ScxRecord { ...@@ -145,7 +147,7 @@ class ScxRecord {
* List of SCX operation descriptions associated with data records * List of SCX operation descriptions associated with data records
* linked with this SCX operation. * linked with this SCX operation.
*/ */
embb::containers::internal::FixedSizeList<self_t> * scx_ops_; embb::containers::internal::FixedSizeList<self_t *> * scx_ops_;
/** /**
* Current state of this SCX record. * Current state of this SCX record.
...@@ -197,7 +199,7 @@ class LlxScxRecord { ...@@ -197,7 +199,7 @@ class LlxScxRecord {
*/ */
LlxScxRecord(const LlxScxRecord & other) LlxScxRecord(const LlxScxRecord & other)
: user_data_(other.user_data_) { : user_data_(other.user_data_) {
scx_op_.Store(other.scx_info_.Load()); scx_op_.Store(other.scx_op_.Load());
marked_for_finalize_ = other.marked_for_finalize_; marked_for_finalize_ = other.marked_for_finalize_;
} }
...@@ -236,9 +238,6 @@ class LlxScxRecord { ...@@ -236,9 +238,6 @@ class LlxScxRecord {
} }
/** /**
* While this SCX is active, the info field acts as a kind of lock
* on the data record, granting exclusive access to this SCX, rather
* than to a process.
* A data record r is frozen for an SCX-record U if r.info points to * A data record r is frozen for an SCX-record U if r.info points to
* U and either U.state is InProgress, or U.state is Committed and r * U and either U.state is InProgress, or U.state is Committed and r
* is marked. * is marked.
...@@ -275,7 +274,7 @@ class LlxScxRecord { ...@@ -275,7 +274,7 @@ class LlxScxRecord {
return marked_for_finalize_; return marked_for_finalize_;
} }
private: private:
/** /**
* Instance of the user-defined data type containing mutable * Instance of the user-defined data type containing mutable
* fields. * fields.
...@@ -407,33 +406,17 @@ class LlxScx { ...@@ -407,33 +406,17 @@ class LlxScx {
/** /**
* Shared table containing for each r in V, a copy of r's info * Shared table containing for each r in V, a copy of r's info
* value in this thread's local table of LLX results. * value in this thread's local table of LLX results.
*
* thread_id -> {
* r_1 in V -> *ScxRecord(thread_llx_results_[r_1].data_record.ScxInfo()),
* ...
* r_i in V -> *ScxRecord(thread_llx_results_[r_i].data_record.ScxInfo())
* }
*/
// embb::containers::internal::FixedSizeList<ScxRecord_t> * info_fields_;
/**
* Shared table containing for each r in V, a copy of r's info
* value in this thread's local table of LLX results.
*
* thread_id -> {
* r_1 in V -> *ScxRecord(thread_llx_results_[r_1].data_record.ScxInfo()),
* ...
* r_i in V -> *ScxRecord(thread_llx_results_[r_i].data_record.ScxInfo())
* }
*/ */
embb::containers::ObjectPool< embb::containers::ObjectPool<
embb::containers::internal::FixedSizeList<ScxRecord_t>, ValuePool > embb::containers::internal::FixedSizeList<ScxRecord_t *>, ValuePool >
scx_record_list_pool_; scx_record_list_pool_;
embb::containers::ObjectPool< ScxRecord_t, ValuePool > scx_record_pool_;
/** /**
* Thread-specific list of LLX results performed by the thread. * Thread-specific list of LLX results performed by the thread.
*/ */
embb::containers::internal::FixedSizeList<LlxResult> * embb::containers::internal::FixedSizeList<LlxResult> **
thread_llx_results_; thread_llx_results_;
/** /**
......
...@@ -38,8 +38,23 @@ using embb::containers::primitives::LlxScx; ...@@ -38,8 +38,23 @@ using embb::containers::primitives::LlxScx;
LlxScxTest::LlxScxTest() : LlxScxTest::LlxScxTest() :
num_threads_( num_threads_(
static_cast<int>(partest::TestSuite::GetDefaultNumThreads())) { static_cast<int>(partest::TestSuite::GetDefaultNumThreads())),
llxscx_(3),
tail(0, '-'),
head(0, '-', Node::node_ptr_t(&tail)) {
CreateUnit("SerialTest").Add(&LlxScxTest::SerialTest, this); CreateUnit("SerialTest").Add(&LlxScxTest::SerialTest, this);
CreateUnit("ParallelTest").Add(&LlxScxTest::ParallelTest, this);
}
void LlxScxTest::ParallelTest() {
typedef LlxScxTest::Node Node;
unsigned int thread_index;
int return_val = embb_internal_thread_index(&thread_index);
if (return_val != EMBB_SUCCESS)
EMBB_THROW(embb::base::ErrorException, "Could not get thread id!");
// Threads try to append n nodes to a linked list in parallel
} }
void LlxScxTest::SerialTest() { void LlxScxTest::SerialTest() {
...@@ -66,10 +81,13 @@ void LlxScxTest::SerialTest() { ...@@ -66,10 +81,13 @@ void LlxScxTest::SerialTest() {
bool finalized; bool finalized;
PT_ASSERT(llxscx.TryLoadLinked(&dr1, l1, finalized)); PT_ASSERT(llxscx.TryLoadLinked(&dr1, l1, finalized));
PT_ASSERT(!finalized); PT_ASSERT(!finalized);
PT_ASSERT_EQ(l1->value_, dr1->value_);
PT_ASSERT(llxscx.TryLoadLinked(&dr2, l2, finalized)); PT_ASSERT(llxscx.TryLoadLinked(&dr2, l2, finalized));
PT_ASSERT(!finalized); PT_ASSERT(!finalized);
PT_ASSERT_EQ(l2->value_, dr2->value_);
PT_ASSERT(llxscx.TryLoadLinked(&dr3, l3, finalized)); PT_ASSERT(llxscx.TryLoadLinked(&dr3, l3, finalized));
PT_ASSERT(!finalized); PT_ASSERT(!finalized);
PT_ASSERT_EQ(l3->value_, dr3->value_);
FixedSizeList< LlxScxRecord<Node> * > FixedSizeList< LlxScxRecord<Node> * >
linked_deps(3); linked_deps(3);
......
...@@ -40,12 +40,11 @@ class LlxScxTest : public partest::TestCase { ...@@ -40,12 +40,11 @@ class LlxScxTest : public partest::TestCase {
class Node { class Node {
public: public:
typedef primitives::LlxScxRecord<Node> * node_ptr_t; typedef primitives::LlxScxRecord<Node> * node_ptr_t;
private:
char value_;
public: public:
embb::base::Atomic<primitives::LlxScxRecord<Node> *> next_; embb::base::Atomic<primitives::LlxScxRecord<Node> *> next_;
embb::base::Atomic<int> count_; embb::base::Atomic<int> count_;
char value_;
public: public:
Node() Node()
...@@ -90,9 +89,12 @@ class LlxScxTest : public partest::TestCase { ...@@ -90,9 +89,12 @@ class LlxScxTest : public partest::TestCase {
private: private:
void SerialTest(); void SerialTest();
void ParallelTest();
int num_threads_; int num_threads_;
LlxScx<Node> llxscx_;
Node tail;
Node head;
}; };
} // namespace test } // namespace test
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment