Commit 34f6e4f3 by Danila Klimenko

chromatic_tree: lock-free implementation without rebalancing

parent e518a31d
...@@ -478,7 +478,7 @@ Type& UniqueHazardPointer<Type>::operator*() const { ...@@ -478,7 +478,7 @@ Type& UniqueHazardPointer<Type>::operator*() const {
} }
template<typename Type> template<typename Type>
void UniqueHazardPointer<Type>::AdoptGuard(const UniqueHazardPointer& other) { void UniqueHazardPointer<Type>::AdoptHazard(const UniqueHazardPointer& other) {
assert(OwnsHazardGuard()); assert(OwnsHazardGuard());
StoreGuardedPointer(other.LoadGuardedPointer()); StoreGuardedPointer(other.LoadGuardedPointer());
SetActive(other.active_); SetActive(other.active_);
......
...@@ -620,7 +620,7 @@ class UniqueHazardPointer { ...@@ -620,7 +620,7 @@ class UniqueHazardPointer {
* \param other Another wrapper those protected pointer is to be protected by * \param other Another wrapper those protected pointer is to be protected by
* the calling wrapper * the calling wrapper
*/ */
void AdoptGuard(const UniqueHazardPointer& other); void AdoptHazard(const UniqueHazardPointer& other);
/** /**
* Swaps the guard ownership with another wrapper. Swaps not just the * Swaps the guard ownership with another wrapper. Swaps not just the
......
...@@ -45,23 +45,26 @@ namespace internal { ...@@ -45,23 +45,26 @@ namespace internal {
template<typename Key, typename Value> template<typename Key, typename Value>
ChromaticTreeNode<Key, Value>:: ChromaticTreeNode<Key, Value>::
ChromaticTreeNode(const Key& key, const Value& value, int weight, ChromaticTreeNode(const Key& key, const Value& value, int weight,
Node* left, Node* right) Node* left, Node* right, Operation* operation)
: key_(key), : key_(key),
value_(value), value_(value),
weight_(weight), weight_(weight),
left_(left), left_(left),
right_(right), right_(right),
retired_(false) {} retired_(false),
operation_(operation) {}
template<typename Key, typename Value> template<typename Key, typename Value>
ChromaticTreeNode<Key, Value>:: ChromaticTreeNode<Key, Value>::
ChromaticTreeNode(const Key& key, const Value& value, int weight) ChromaticTreeNode(const Key& key, const Value& value, int weight,
Operation* operation)
: key_(key), : key_(key),
value_(value), value_(value),
weight_(weight), weight_(weight),
left_(NULL), left_(NULL),
right_(NULL), right_(NULL),
retired_(false) {} retired_(false),
operation_(operation) {}
template<typename Key, typename Value> template<typename Key, typename Value>
const Key& ChromaticTreeNode<Key, Value>::GetKey() const { const Key& ChromaticTreeNode<Key, Value>::GetKey() const {
...@@ -126,11 +129,6 @@ bool ChromaticTreeNode<Key, Value>::IsRetired() const { ...@@ -126,11 +129,6 @@ bool ChromaticTreeNode<Key, Value>::IsRetired() const {
return retired_; return retired_;
} }
template<typename Key, typename Value>
embb::base::Mutex& ChromaticTreeNode<Key, Value>::GetMutex() {
return mutex_;
}
} // namespace internal } // namespace internal
...@@ -144,7 +142,11 @@ ChromaticTree(size_t capacity, Key undefined_key, Value undefined_value, ...@@ -144,7 +142,11 @@ ChromaticTree(size_t capacity, Key undefined_key, Value undefined_value,
#endif #endif
: node_hazard_manager_( : node_hazard_manager_(
embb::base::Function<void, Node*>(*this, &ChromaticTree::FreeNode), embb::base::Function<void, Node*>(*this, &ChromaticTree::FreeNode),
NULL, 10), NULL, HIDX_MAX),
operation_hazard_manager_(
embb::base::Function<void, Operation*>(*this,
&ChromaticTree::FreeOperation),
NULL, HIDX_MAX),
#ifdef EMBB_PLATFORM_COMPILER_MSVC #ifdef EMBB_PLATFORM_COMPILER_MSVC
#pragma warning(pop) #pragma warning(pop)
#endif #endif
...@@ -155,10 +157,16 @@ ChromaticTree(size_t capacity, Key undefined_key, Value undefined_value, ...@@ -155,10 +157,16 @@ ChromaticTree(size_t capacity, Key undefined_key, Value undefined_value,
node_pool_(2 + 5 + 2 * capacity_ + node_pool_(2 + 5 + 2 * capacity_ +
node_hazard_manager_.GetRetiredListMaxSize() * node_hazard_manager_.GetRetiredListMaxSize() *
embb::base::Thread::GetThreadsMaxCount()), embb::base::Thread::GetThreadsMaxCount()),
operation_pool_(2 + 5 + 2 * capacity_ +
operation_hazard_manager_.GetRetiredListMaxSize() *
embb::base::Thread::GetThreadsMaxCount()),
entry_(node_pool_.Allocate(undefined_key_, undefined_value_, 1, entry_(node_pool_.Allocate(undefined_key_, undefined_value_, 1,
node_pool_.Allocate(undefined_key_, node_pool_.Allocate(undefined_key_,
undefined_value_), undefined_value_,
static_cast<Node*>(NULL))) { 1,
Operation::INITIAL_DUMMY),
static_cast<Node*>(NULL),
Operation::INITIAL_DUMMY)) {
assert(entry_ != NULL); assert(entry_ != NULL);
assert(entry_->GetLeft() != NULL); assert(entry_->GetLeft() != NULL);
} }
...@@ -173,9 +181,10 @@ ChromaticTree<Key, Value, Compare, ValuePool>:: ...@@ -173,9 +181,10 @@ ChromaticTree<Key, Value, Compare, ValuePool>::
template<typename Key, typename Value, typename Compare, typename ValuePool> template<typename Key, typename Value, typename Compare, typename ValuePool>
bool ChromaticTree<Key, Value, Compare, ValuePool>:: bool ChromaticTree<Key, Value, Compare, ValuePool>::
Get(const Key& key, Value& value) { Get(const Key& key, Value& value) {
HazardNodePtr parent(node_hazard_manager_.GetGuardedPointer(0)); HazardNodePtr grandparent(GetNodeGuard(HIDX_GRANDPARENT));
HazardNodePtr leaf (node_hazard_manager_.GetGuardedPointer(1)); HazardNodePtr parent(GetNodeGuard(HIDX_PARENT));
Search(key, leaf, parent); HazardNodePtr leaf(GetNodeGuard(HIDX_LEAF));
Search(key, leaf, parent, grandparent);
bool keys_are_equal = !IsSentinel(leaf) && bool keys_are_equal = !IsSentinel(leaf) &&
!(compare_(key, leaf->GetKey()) || !(compare_(key, leaf->GetKey()) ||
...@@ -205,52 +214,87 @@ TryInsert(const Key& key, const Value& value, Value& old_value) { ...@@ -205,52 +214,87 @@ TryInsert(const Key& key, const Value& value, Value& old_value) {
bool added_violation = false; bool added_violation = false;
while (!insertion_succeeded) { while (!insertion_succeeded) {
HazardNodePtr parent(node_hazard_manager_.GetGuardedPointer(0)); HazardNodePtr grandparent(GetNodeGuard(HIDX_GRANDPARENT));
HazardNodePtr leaf (node_hazard_manager_.GetGuardedPointer(1)); HazardNodePtr parent(GetNodeGuard(HIDX_PARENT));
Search(key, leaf, parent); HazardNodePtr leaf(GetNodeGuard(HIDX_LEAF));
Search(key, leaf, parent, grandparent);
// Try to lock the parent // Protect the parent
UniqueLock parent_lock(parent->GetMutex(), embb::base::try_lock); HazardOperationPtr parent_op(GetOperationGuard(HIDX_PARENT));
if (!parent_lock.OwnsLock() || parent->IsRetired()) continue; if (!WeakLLX(parent, parent_op)) continue;
// Verify that the leaf is still the parent's child // Verify that the leaf is still the parent's child
if (!HasChild(parent, leaf)) continue; if (!HasChild(parent, leaf)) continue;
// Try to lock the leaf // Protect the leaf
UniqueLock leaf_lock(leaf->GetMutex(), embb::base::try_lock); HazardOperationPtr leaf_op(GetOperationGuard(HIDX_LEAF));
if (!leaf_lock.OwnsLock() || leaf->IsRetired()) continue; if (!WeakLLX(leaf, leaf_op)) continue;
// Reached leaf has a matching key: replace it with a new copy bool keys_are_equal = !IsSentinel(leaf) &&
if (!IsSentinel(leaf) && !(compare_(key, leaf->GetKey()) || !(compare_(key, leaf->GetKey()) ||
compare_(leaf->GetKey(), key))) { compare_(leaf->GetKey(), key));
if (keys_are_equal) {
// Reached leaf has a matching key: replace it with a new copy
old_value = leaf->GetValue(); old_value = leaf->GetValue();
new_parent = node_pool_.Allocate(key, value, leaf->GetWeight()); new_parent = node_pool_.Allocate(key, value, leaf->GetWeight(),
Operation::INITIAL_DUMMY);
if (new_parent == NULL) break; if (new_parent == NULL) break;
// Reached leaf has a different key: add a new leaf // Reached leaf has a different key: add a new leaf
} else { } else {
old_value = undefined_value_; old_value = undefined_value_;
new_leaf = node_pool_.Allocate(key, value); new_leaf = node_pool_.Allocate(key, value, 1,
Operation::INITIAL_DUMMY);
if (new_leaf == NULL) break; if (new_leaf == NULL) break;
new_sibling = node_pool_.Allocate(leaf->GetKey(), leaf->GetValue()); new_sibling = node_pool_.Allocate(leaf->GetKey(), leaf->GetValue(), 1,
Operation::INITIAL_DUMMY);
if (new_sibling == NULL) break; if (new_sibling == NULL) break;
int new_weight = (IsSentinel(parent)) ? 1 : (leaf->GetWeight() - 1); int new_weight = (IsSentinel(parent)) ? 1 : (leaf->GetWeight() - 1);
if (IsSentinel(leaf) || compare_(key, leaf->GetKey())) { if (IsSentinel(leaf) || compare_(key, leaf->GetKey())) {
new_parent = node_pool_.Allocate(leaf->GetKey(), undefined_value_, new_parent = node_pool_.Allocate(leaf->GetKey(), undefined_value_,
new_weight, new_leaf, new_sibling); new_weight, new_leaf, new_sibling,
Operation::INITIAL_DUMMY);
} else { } else {
new_parent = node_pool_.Allocate(key, undefined_value_, new_parent = node_pool_.Allocate(key, undefined_value_,
new_weight, new_sibling, new_leaf); new_weight, new_sibling, new_leaf,
Operation::INITIAL_DUMMY);
} }
if (new_parent == NULL) break; if (new_parent == NULL) break;
} }
insertion_succeeded = parent->ReplaceChild(leaf, new_parent); // Create and fill the operation object
assert(insertion_succeeded); // For now (FGL tree) this CAS may not fail HazardOperationPtr insert_op(GetOperationGuard(HIDX_CURRENT_OP));
if (!insertion_succeeded) continue; insert_op.ProtectSafe(operation_pool_.Allocate());
if (insert_op == NULL) break;
insert_op->SetRoot(parent, parent_op);
insert_op->SetOldNodes(leaf, leaf_op);
insert_op->SetNewChild(new_parent);
// Execute operation
insertion_succeeded = insert_op->Help(GetNodeGuard(HIDX_HELPING),
GetOperationGuard(HIDX_HELPING));
insert_op->CleanUp();
// If operation failed
if (!insertion_succeeded) {
// Retire failed operation
RetireOperation(insert_op);
// Delete new nodes
FreeNode(new_parent); new_parent = NULL;
if (!keys_are_equal) {
FreeNode(new_leaf); new_leaf = NULL;
FreeNode(new_sibling); new_sibling = NULL;
}
// Restart from scratch
continue;
}
RetireHazardousNode(leaf, leaf_lock); // Retire old operation objects
RetireOperation(parent_op);
RetireOperation(leaf_op);
// Retire old nodes
RetireNode(leaf);
added_violation = (parent->GetWeight() == 0 && added_violation = (parent->GetWeight() == 0 &&
new_parent->GetWeight() == 0); new_parent->GetWeight() == 0);
...@@ -282,9 +326,9 @@ TryDelete(const Key& key, Value& old_value) { ...@@ -282,9 +326,9 @@ TryDelete(const Key& key, Value& old_value) {
bool added_violation = false; bool added_violation = false;
while (!deletion_succeeded) { while (!deletion_succeeded) {
HazardNodePtr grandparent(node_hazard_manager_.GetGuardedPointer(0)); HazardNodePtr grandparent(GetNodeGuard(HIDX_GRANDPARENT));
HazardNodePtr parent (node_hazard_manager_.GetGuardedPointer(1)); HazardNodePtr parent(GetNodeGuard(HIDX_PARENT));
HazardNodePtr leaf (node_hazard_manager_.GetGuardedPointer(2)); HazardNodePtr leaf(GetNodeGuard(HIDX_LEAF));
Search(key, leaf, parent, grandparent); Search(key, leaf, parent, grandparent);
// Reached leaf has a different key - nothing to delete // Reached leaf has a different key - nothing to delete
...@@ -295,18 +339,18 @@ TryDelete(const Key& key, Value& old_value) { ...@@ -295,18 +339,18 @@ TryDelete(const Key& key, Value& old_value) {
break; break;
} }
// Try to lock the grandparent // Protect the grandparent
UniqueLock grandparent_lock(grandparent->GetMutex(), embb::base::try_lock); HazardOperationPtr grandparent_op(GetOperationGuard(HIDX_GRANDPARENT));
if (!grandparent_lock.OwnsLock() || grandparent->IsRetired()) continue; if (!WeakLLX(grandparent, grandparent_op)) continue;
// Verify that the parent is still the grandparent's child // Verify that the parent is still the child of grandparent
if (!HasChild(grandparent, parent)) continue; if (!HasChild(grandparent, parent)) continue;
// Try to lock the parent // Protect the parent
UniqueLock parent_lock(parent->GetMutex(), embb::base::try_lock); HazardOperationPtr parent_op(GetOperationGuard(HIDX_PARENT));
if (!parent_lock.OwnsLock() || parent->IsRetired()) continue; if (!WeakLLX(parent, parent_op)) continue;
// Get the sibling (and protect it with hazard pointer) // Get the sibling (and protect it with hazard pointer)
HazardNodePtr sibling(node_hazard_manager_.GetGuardedPointer(3)); HazardNodePtr sibling(GetNodeGuard(HIDX_SIBLING));
sibling.ProtectHazard((parent->GetLeft() == leaf) ? sibling.ProtectHazard((parent->GetLeft() == leaf) ?
parent->GetRight() : parent->GetLeft()); parent->GetRight() : parent->GetLeft());
if (parent->IsRetired() || !sibling.IsActive()) continue; if (parent->IsRetired() || !sibling.IsActive()) continue;
...@@ -315,31 +359,56 @@ TryDelete(const Key& key, Value& old_value) { ...@@ -315,31 +359,56 @@ TryDelete(const Key& key, Value& old_value) {
// Verify that the leaf is still the parent's child // Verify that the leaf is still the parent's child
if (!HasChild(parent, leaf)) continue; if (!HasChild(parent, leaf)) continue;
// Try to lock the sibling // Protect the sibling
UniqueLock sibling_lock(sibling->GetMutex(), embb::base::try_lock); HazardOperationPtr sibling_op(GetOperationGuard(HIDX_SIBLING));
if (!sibling_lock.OwnsLock() || sibling->IsRetired()) continue; if (!WeakLLX(sibling, sibling_op)) continue;
// Try to lock the leaf // Protect the leaf
UniqueLock leaf_lock(leaf->GetMutex(), embb::base::try_lock); HazardOperationPtr leaf_op(GetOperationGuard(HIDX_LEAF));
if (!leaf_lock.OwnsLock() || leaf->IsRetired()) continue; if (!WeakLLX(leaf, leaf_op)) continue;
int new_weight = (IsSentinel(grandparent)) ? int new_weight = (IsSentinel(grandparent)) ?
1 : (parent->GetWeight() + sibling->GetWeight()); 1 : (parent->GetWeight() + sibling->GetWeight());
new_leaf = node_pool_.Allocate( new_leaf = node_pool_.Allocate(
sibling->GetKey(), sibling->GetValue(), new_weight, sibling->GetKey(), sibling->GetValue(), new_weight,
sibling->GetLeft(), sibling->GetRight()); sibling->GetLeft(), sibling->GetRight(), Operation::INITIAL_DUMMY);
if (new_leaf == NULL) break; if (new_leaf == NULL) break;
old_value = leaf->GetValue(); old_value = leaf->GetValue();
deletion_succeeded = grandparent->ReplaceChild(parent, new_leaf); // Create and fill the operation object
assert(deletion_succeeded); // For now (FGL tree) this CAS may not fail HazardOperationPtr delete_op(GetOperationGuard(HIDX_CURRENT_OP));
if (!deletion_succeeded) continue; delete_op.ProtectSafe(operation_pool_.Allocate());
if (delete_op == NULL) break;
delete_op->SetRoot(grandparent, grandparent_op);
delete_op->SetOldNodes(parent, parent_op, leaf, leaf_op, sibling, sibling_op);
delete_op->SetNewChild(new_leaf);
// Execute operation
deletion_succeeded = delete_op->Help(GetNodeGuard(HIDX_HELPING),
GetOperationGuard(HIDX_HELPING));
delete_op->CleanUp();
// If operation failed
if (!deletion_succeeded) {
// Retire failed operation
RetireOperation(delete_op);
// Delete new nodes
FreeNode(new_leaf);
// Restart from scratch
continue;
}
RetireHazardousNode(parent, parent_lock); // Retire old operation objects
RetireHazardousNode(leaf, leaf_lock); RetireOperation(grandparent_op);
RetireHazardousNode(sibling, sibling_lock); RetireOperation(parent_op);
RetireOperation(leaf_op);
RetireOperation(sibling_op);
// Retire old nodes
RetireNode(parent);
RetireNode(leaf);
RetireNode(sibling);
added_violation = (new_weight > 1); added_violation = (new_weight > 1);
} }
...@@ -373,46 +442,37 @@ IsEmpty() const { ...@@ -373,46 +442,37 @@ IsEmpty() const {
template<typename Key, typename Value, typename Compare, typename ValuePool> template<typename Key, typename Value, typename Compare, typename ValuePool>
void ChromaticTree<Key, Value, Compare, ValuePool>:: void ChromaticTree<Key, Value, Compare, ValuePool>::
Search(const Key& key, HazardNodePtr& leaf, HazardNodePtr& parent) { Search(const Key& key, HazardNodePtr& leaf, HazardNodePtr& parent,
HazardNodePtr& grandparent) {
bool reached_leaf = false; bool reached_leaf = false;
while (!reached_leaf) { while (!reached_leaf) {
grandparent.ProtectSafe(entry_);
parent.ProtectSafe(entry_); parent.ProtectSafe(entry_);
leaf.ProtectHazard(entry_->GetLeft()); leaf.ProtectSafe(entry_);
if (parent->IsRetired() || !leaf.IsActive()) continue;
reached_leaf = IsLeaf(leaf); reached_leaf = IsLeaf(leaf);
while (!reached_leaf) { while (!reached_leaf) {
parent.AdoptGuard(leaf); grandparent.AdoptHazard(parent);
leaf.ProtectHazard((IsSentinel(leaf) || compare_(key, leaf->GetKey())) ? parent.AdoptHazard(leaf);
leaf->GetLeft() : leaf->GetRight());
if (parent->IsRetired() || !leaf.IsActive()) break;
VERIFY_ADDRESS(static_cast<Node*>(leaf));
reached_leaf = IsLeaf(leaf); AtomicNodePtr& next_leaf =
} (IsSentinel(leaf) || compare_(key, leaf->GetKey())) ?
} leaf->GetLeft() : leaf->GetRight();
}
template<typename Key, typename Value, typename Compare, typename ValuePool> // Parent is protected, so we can tolerate a changing child pointer
void ChromaticTree<Key, Value, Compare, ValuePool>:: while(!leaf.ProtectHazard(next_leaf));
Search(const Key& key, HazardNodePtr& leaf, HazardNodePtr& parent,
HazardNodePtr& grandparent) {
bool reached_leaf = false;
while (!reached_leaf) { // Parent is retired - make sure it is actually removed from the tree
grandparent.ProtectSafe(entry_); if (parent->IsRetired()) {
parent.ProtectSafe(entry_); HazardOperationPtr op(GetOperationGuard(HIDX_HELPING));
leaf.ProtectHazard(entry_->GetLeft()); if (op.ProtectHazard(parent->GetOperation())) {
if (parent->IsRetired() || !leaf.IsActive()) continue; op->HelpCommit(GetNodeGuard(HIDX_HELPING));
}
// Can't follow a child pointer in a retired node - restart from root
break;
}
reached_leaf = IsLeaf(leaf);
while (!reached_leaf) {
grandparent.AdoptGuard(parent);
parent.AdoptGuard(leaf);
leaf.ProtectHazard((IsSentinel(leaf) || compare_(key, leaf->GetKey())) ?
leaf->GetLeft() : leaf->GetRight());
if (parent->IsRetired() || !leaf.IsActive()) break;
VERIFY_ADDRESS(static_cast<Node*>(leaf)); VERIFY_ADDRESS(static_cast<Node*>(leaf));
reached_leaf = IsLeaf(leaf); reached_leaf = IsLeaf(leaf);
...@@ -490,69 +550,127 @@ IsBalanced(const Node* node) const { ...@@ -490,69 +550,127 @@ IsBalanced(const Node* node) const {
template<typename Key, typename Value, typename Compare, typename ValuePool> template<typename Key, typename Value, typename Compare, typename ValuePool>
void ChromaticTree<Key, Value, Compare, ValuePool>:: void ChromaticTree<Key, Value, Compare, ValuePool>::
RetireHazardousNode(HazardNodePtr& node, UniqueLock& node_lock) { RetireNode(HazardNodePtr& node) {
node->Retire(); node_hazard_manager_.EnqueuePointerForDeletion(node.ReleaseHazard());
node_lock.Unlock();
Node* node_to_delete = node.ReleaseHazard();
node_hazard_manager_.EnqueuePointerForDeletion(node_to_delete);
} }
template<typename Key, typename Value, typename Compare, typename ValuePool> template<typename Key, typename Value, typename Compare, typename ValuePool>
void ChromaticTree<Key, Value, Compare, ValuePool>:: void ChromaticTree<Key, Value, Compare, ValuePool>::
FreeNode(Node* node) { RetireOperation(HazardOperationPtr& operation) {
#ifdef EMBB_DEBUG Operation* op = operation.ReleaseHazard();
node->GetLeft() = reinterpret_cast<Node*>(INVALID_POINTER); if (op != Operation::INITIAL_DUMMY && op != Operation::RETIRED_DUMMY) {
node->GetRight() = reinterpret_cast<Node*>(INVALID_POINTER); operation_hazard_manager_.EnqueuePointerForDeletion(op);
#endif }
node_pool_.Free(node); }
template<typename Key, typename Value, typename Compare, typename ValuePool>
typename ChromaticTree<Key, Value, Compare, ValuePool>::AtomicNodePtr&
ChromaticTree<Key, Value, Compare, ValuePool>::
GetNodeGuard(HazardIndex index) {
return node_hazard_manager_.GetGuardedPointer(static_cast<int>(index));
}
template<typename Key, typename Value, typename Compare, typename ValuePool>
typename ChromaticTree<Key, Value, Compare, ValuePool>::AtomicOperationPtr&
ChromaticTree<Key, Value, Compare, ValuePool>::
GetOperationGuard(HazardIndex index) {
return operation_hazard_manager_.GetGuardedPointer(static_cast<int>(index));
} }
template<typename Key, typename Value, typename Compare, typename ValuePool> template<typename Key, typename Value, typename Compare, typename ValuePool>
bool ChromaticTree<Key, Value, Compare, ValuePool>:: bool ChromaticTree<Key, Value, Compare, ValuePool>::
CleanUp(const Key& key) { WeakLLX(HazardNodePtr& node, HazardOperationPtr& operation) {
HazardNodePtr grandgrandparent(node_hazard_manager_.GetGuardedPointer(0)); // Make sure we have a protected operation pointer
HazardNodePtr grandparent (node_hazard_manager_.GetGuardedPointer(1)); while (!operation.ProtectHazard(node->GetOperation()));
HazardNodePtr parent (node_hazard_manager_.GetGuardedPointer(2));
HazardNodePtr leaf (node_hazard_manager_.GetGuardedPointer(3));
bool reached_leaf = false;
while (!reached_leaf) { // Node is not retired and operation is committed - node is available
bool found_violation = false; if (!node->IsRetired() && operation->IsCommitted()) {
return true;
}
grandgrandparent.ProtectSafe(entry_); if (operation->IsAborted()) {
grandparent.ProtectSafe(entry_); // Operation is aborted, but the node is still frozen - unfreeze it
parent.ProtectSafe(entry_); operation->HelpAbort(node);
leaf.ProtectHazard(entry_->GetLeft());
if (parent->IsRetired() || !leaf.IsActive()) continue;
reached_leaf = IsLeaf(leaf); } else if (operation->IsInProgress()) {
while (!reached_leaf && !found_violation) { // Operation is still in progress - help it
grandgrandparent.AdoptGuard(grandparent); operation->Help(GetNodeGuard(HIDX_HELPING),
grandparent.AdoptGuard(parent); GetOperationGuard(HIDX_HELPING));
parent.AdoptGuard(leaf); }
leaf.ProtectHazard((IsSentinel(leaf) || compare_(key, leaf->GetKey())) ?
leaf->GetLeft() : leaf->GetRight());
if (parent->IsRetired() || !leaf.IsActive()) break;
VERIFY_ADDRESS(static_cast<Node*>(leaf));
found_violation = (leaf->GetWeight() > 1) || // LLX failed - operation pointer should not be exposed
(leaf->GetWeight() == 0 && parent->GetWeight() == 0); operation.ReleaseHazard();
reached_leaf = IsLeaf(leaf); return false;
} }
if (found_violation) { template<typename Key, typename Value, typename Compare, typename ValuePool>
reached_leaf = false; void ChromaticTree<Key, Value, Compare, ValuePool>::
FreeNode(Node* node) {
#ifdef EMBB_DEBUG
node->GetLeft() = reinterpret_cast<Node*>(INVALID_POINTER);
node->GetRight() = reinterpret_cast<Node*>(INVALID_POINTER);
#endif
node_pool_.Free(node);
}
if (Rebalance(grandgrandparent, grandparent, parent, leaf) == template<typename Key, typename Value, typename Compare, typename ValuePool>
EMBB_NOMEM) { void ChromaticTree<Key, Value, Compare, ValuePool>::
assert(false && "No memory for rebalancing!"); FreeOperation(Operation* operation) {
return false; #ifdef EMBB_DEBUG
} operation->SetDeleted();
} #endif
} operation_pool_.Free(operation);
}
template<typename Key, typename Value, typename Compare, typename ValuePool>
bool ChromaticTree<Key, Value, Compare, ValuePool>::
CleanUp(const Key& key) {
(void)(key);
return true; return true;
// HazardNodePtr grandgrandparent(node_hazard_manager_.GetGuardedPointer(0));
// HazardNodePtr grandparent (node_hazard_manager_.GetGuardedPointer(1));
// HazardNodePtr parent (node_hazard_manager_.GetGuardedPointer(2));
// HazardNodePtr leaf (node_hazard_manager_.GetGuardedPointer(3));
// bool reached_leaf = false;
//
// while (!reached_leaf) {
// bool found_violation = false;
//
// grandgrandparent.ProtectSafe(entry_);
// grandparent.ProtectSafe(entry_);
// parent.ProtectSafe(entry_);
// leaf.ProtectHazard(entry_->GetLeft());
// if (parent->IsRetired() || !leaf.IsActive()) continue;
//
// reached_leaf = IsLeaf(leaf);
// while (!reached_leaf && !found_violation) {
// grandgrandparent.AdoptGuard(grandparent);
// grandparent.AdoptGuard(parent);
// parent.AdoptGuard(leaf);
// leaf.ProtectHazard((IsSentinel(leaf) || compare_(key, leaf->GetKey())) ?
// leaf->GetLeft() : leaf->GetRight());
// if (parent->IsRetired() || !leaf.IsActive()) break;
// VERIFY_ADDRESS(static_cast<Node*>(leaf));
//
// found_violation = (leaf->GetWeight() > 1) ||
// (leaf->GetWeight() == 0 && parent->GetWeight() == 0);
//
// reached_leaf = IsLeaf(leaf);
// }
//
// if (found_violation) {
// reached_leaf = false;
//
// if (Rebalance(grandgrandparent, grandparent, parent, leaf) ==
// EMBB_NOMEM) {
// assert(false && "No memory for rebalancing!");
// return false;
// }
// }
// }
//
// return true;
} }
#define PROTECT_NODE_WITH_LOCK(node, lock_name) \ #define PROTECT_NODE_WITH_LOCK(node, lock_name) \
......
...@@ -201,6 +201,20 @@ Type* ObjectPool<Type, ValuePool, ObjectAllocator>::Allocate( ...@@ -201,6 +201,20 @@ Type* ObjectPool<Type, ValuePool, ObjectAllocator>::Allocate(
} }
template<class Type, typename ValuePool, class ObjectAllocator> template<class Type, typename ValuePool, class ObjectAllocator>
template<typename Param1, typename Param2, typename Param3, typename Param4,
typename Param5, typename Param6>
Type* ObjectPool<Type, ValuePool, ObjectAllocator>::Allocate(
Param1 const& param1, Param2 const& param2,
Param3 const& param3, Param4 const& param4,
Param5 const& param5, Param6 const& param6) {
Type* rawObject = AllocateRaw();
if (rawObject != NULL)
new (rawObject)Type(param1, param2, param3, param4, param5, param6);
return rawObject;
}
template<class Type, typename ValuePool, class ObjectAllocator>
ObjectPool<Type, ValuePool, ObjectAllocator>::~ObjectPool() { ObjectPool<Type, ValuePool, ObjectAllocator>::~ObjectPool() {
// Deallocate the objects // Deallocate the objects
objectAllocator.deallocate(objects, capacity); objectAllocator.deallocate(objects, capacity);
......
...@@ -38,6 +38,9 @@ namespace embb { ...@@ -38,6 +38,9 @@ namespace embb {
namespace containers { namespace containers {
namespace internal { namespace internal {
template<typename Key, typename Value>
class ChromaticTreeOperation;
/** /**
* Tree node * Tree node
* *
...@@ -50,8 +53,10 @@ namespace internal { ...@@ -50,8 +53,10 @@ namespace internal {
template<typename Key, typename Value> template<typename Key, typename Value>
class ChromaticTreeNode { class ChromaticTreeNode {
public: public:
typedef ChromaticTreeNode<Key, Value> Node; typedef ChromaticTreeNode Node;
typedef embb::base::Atomic<Node*> AtomicNodePtr; typedef embb::base::Atomic<Node*> AtomicNodePtr;
typedef ChromaticTreeOperation<Key, Value> Operation;
typedef embb::base::Atomic<Operation*> AtomicOperationPtr;
/** /**
* Creates a node with given parameters. * Creates a node with given parameters.
...@@ -63,17 +68,17 @@ class ChromaticTreeNode { ...@@ -63,17 +68,17 @@ class ChromaticTreeNode {
* \param[IN] right Pointer to the right child node * \param[IN] right Pointer to the right child node
*/ */
ChromaticTreeNode(const Key& key, const Value& value, int weight, ChromaticTreeNode(const Key& key, const Value& value, int weight,
Node* left, Node* right); Node* left, Node* right, Operation* operation);
/** /**
* Creates a node given only a key-value pair. Node will have no child nodes * Creates a node with given parameters and no child nodes.
* and a default weight (1).
* *
* \param[IN] key Key of the new node * \param[IN] key Key of the new node
* \param[IN] value Value of the new node * \param[IN] value Value of the new node
* \param[IN] weight Weight of the new node * \param[IN] weight Weight of the new node
*/ */
ChromaticTreeNode(const Key& key, const Value& value, int weight = 1); ChromaticTreeNode(const Key& key, const Value& value, int weight,
Operation* operation);
/** /**
* Accessor for the stored key. * Accessor for the stored key.
...@@ -139,29 +144,328 @@ class ChromaticTreeNode { ...@@ -139,29 +144,328 @@ class ChromaticTreeNode {
bool IsRetired() const; bool IsRetired() const;
/** /**
* Accessor for the FGL mutex * Accessor for the operation pointer of the node
* *
* \return Reference to this node's mutex * \return Reference to this node's operation pointer
*/ */
embb::base::Mutex& GetMutex(); AtomicOperationPtr& GetOperation() {
return operation_;
}
private: private:
typedef embb::base::Atomic<bool> AtomicFlag;
/** /**
* Disable copy construction and assignment. * Disable copy construction and assignment.
*/ */
ChromaticTreeNode(const ChromaticTreeNode&); ChromaticTreeNode(const ChromaticTreeNode&);
ChromaticTreeNode& operator=(const ChromaticTreeNode&); ChromaticTreeNode& operator=(const ChromaticTreeNode&);
const Key key_; /**< Stored key */ const Key key_; /**< Stored key */
const Value value_; /**< Stored value */ const Value value_; /**< Stored value */
const int weight_; /**< Weight of the node */ const int weight_; /**< Weight of the node */
AtomicNodePtr left_; /**< Pointer to left child node */ AtomicNodePtr left_; /**< Pointer to left child node */
AtomicNodePtr right_; /**< Pointer to right child node */ AtomicNodePtr right_; /**< Pointer to right child node */
embb::base::Atomic<bool> retired_; /**< Retired (marked for deletion) flag */ AtomicFlag retired_; /**< Retired (marked for deletion) flag */
AtomicOperationPtr operation_; /**< Pointer to a tree operation object */
embb::base::Mutex mutex_; /**< Fine-grained locking tree: per node mutex */
}; };
template<typename Key, typename Value>
class ChromaticTreeOperation {
public:
typedef ChromaticTreeNode<Key, Value> Node;
typedef embb::base::Atomic<Node*> AtomicNodePtr;
typedef UniqueHazardPointer<Node> HazardNodePtr;
typedef ChromaticTreeOperation Operation;
typedef embb::base::Atomic<Operation*> AtomicOperationPtr;
typedef UniqueHazardPointer<Operation> HazardOperationPtr;
static Operation* const INITIAL_DUMMY;
static Operation* const RETIRED_DUMMY;
ChromaticTreeOperation()
: state_(STATE_FREEZING),
root_(NULL),
root_operation_(NULL),
num_old_nodes_(0),
old_nodes_(),
old_operations_(),
new_child_(NULL)
#ifdef EMBB_DEBUG
, deleted_(false)
#endif
{}
void SetRoot(Node* root, Operation* root_operation) {
root_ = root;
root_operation_ = root_operation;
}
void SetOldNodes(Node* node, Operation* operation) {
num_old_nodes_ = 1;
old_nodes_[0] = node;
old_operations_[0] = operation;
}
void SetOldNodes(Node* node1, Operation* operation1,
Node* node2, Operation* operation2,
Node* node3, Operation* operation3) {
num_old_nodes_ = 3;
old_nodes_[0] = node1;
old_operations_[0] = operation1;
old_nodes_[1] = node2;
old_operations_[1] = operation2;
old_nodes_[2] = node3;
old_operations_[2] = operation3;
}
void SetNewChild(Node* new_child) {
new_child_ = new_child;
}
bool Help(AtomicNodePtr& node_guard, AtomicOperationPtr& oper_guard) {
#ifdef EMBB_DEBUG
assert(!deleted_);
#endif
// Freezing step
if (!FreezeAll(node_guard, oper_guard)) {
return IsCommitted();
}
// All frozen step
if (!SwitchState(STATE_FREEZING, STATE_ALL_FROZEN)) {
return IsCommitted();
}
// At this point operation may no longer fail - complete it
HelpCommit(node_guard);
return true;
}
void HelpCommit(AtomicNodePtr& node_guard) {
HazardNodePtr node_hp(node_guard);
// Mark step (retire old nodes)
for (size_t i = 0; i < num_old_nodes_; ++i) {
node_hp.ProtectSafe(old_nodes_[i]);
if (IsCommitted()) return;
old_nodes_[i]->Retire();
}
// Update step
node_hp.ProtectSafe(root_);
if (IsCommitted()) return;
root_->ReplaceChild(old_nodes_[0], new_child_);
// Commit step
SwitchState(STATE_ALL_FROZEN, STATE_COMMITTED);
}
void HelpAbort(Node* node) {
for (size_t i = 0; i < num_old_nodes_; ++i) {
if (old_nodes_[i] == node) {
Unfreeze(old_nodes_[i], old_operations_[i]);
break;
}
}
}
bool IsAborted() {
#ifdef EMBB_DEBUG
assert(!deleted_);
#endif
return state_ == STATE_ABORTED;
}
bool IsInProgress() {
#ifdef EMBB_DEBUG
assert(!deleted_);
#endif
State state = state_.Load();
return (state != STATE_ABORTED && state != STATE_COMMITTED);
}
bool IsCommitted() {
#ifdef EMBB_DEBUG
assert(!deleted_);
#endif
return state_ == STATE_COMMITTED;
}
void CleanUp() {
#ifdef EMBB_DEBUG
assert(!deleted_);
#endif
assert(!IsInProgress());
if (IsCommitted()) {
for (size_t i = 0; i < num_old_nodes_; ++i) {
assert(old_nodes_[i]->GetOperation() == this);
old_nodes_[i]->GetOperation() = RETIRED_DUMMY;
}
}
}
#ifdef EMBB_DEBUG
void SetDeleted() {
deleted_ = true;
}
#endif
private:
typedef enum {
STATE_ABORTED,
STATE_ROLLBACK,
STATE_FREEZING,
STATE_ALL_FROZEN,
STATE_COMMITTED
} State;
typedef embb::base::Atomic<State> AtomicState;
static const size_t MAX_NODES = 5;
static Operation* GetInitialDummmy() {
static ChromaticTreeOperation initial_dummy;
initial_dummy.state_ = STATE_COMMITTED;
return &initial_dummy;
}
static Operation* GetRetiredDummmy() {
static ChromaticTreeOperation retired_dummy;
retired_dummy.state_ = STATE_COMMITTED;
return &retired_dummy;
}
bool IsRollingBack() {
State state = state_.Load();
return (state == STATE_ROLLBACK || state == STATE_ABORTED);
}
bool IsFreezing() {
return state_ == STATE_FREEZING;
}
bool IsAllFrozen() {
State state = state_.Load();
return (state == STATE_ALL_FROZEN || state == STATE_COMMITTED);
}
bool FreezeAll(AtomicNodePtr& node_guard, AtomicOperationPtr& oper_guard) {
if (IsFreezing()) {
HazardNodePtr node_hp(node_guard);
HazardOperationPtr oper_hp(oper_guard);
node_hp.ProtectSafe(root_);
oper_hp.ProtectSafe(root_operation_);
if (IsFreezing()) {
Freeze(root_, root_operation_);
}
for (size_t i = 0; i < num_old_nodes_; ++i) {
node_hp.ProtectSafe(old_nodes_[i]);
oper_hp.ProtectSafe(old_operations_[i]);
if (!IsFreezing()) break;
Freeze(old_nodes_[i], old_operations_[i]);
}
}
if (IsRollingBack()) {
UnfreezeAll(node_guard);
return false;
}
return true;
}
bool Freeze(Node* node, Operation* operation) {
bool freezed = node->GetOperation().CompareAndSwap(operation, this);
if (!freezed && operation != this && !IsAllFrozen()) {
// Node is frozen for another operation - abort "this"
SwitchState(STATE_FREEZING, STATE_ROLLBACK);
// If the "operation" was aborted and rolled back, some other thread could
// have helped "this" to freeze all nodes, and the previous "SwitchState"
// would fail
return IsAllFrozen();
}
if (freezed && IsRollingBack()) {
// "False-positive" CAS: "this" was aborted and the "operation" might have
// been rolled back; While "node" is hazard protected, unfreeze it again
Unfreeze(node, operation);
return false;
}
return true;
}
void UnfreezeAll(AtomicNodePtr& node_guard) {
HazardNodePtr node_hp(node_guard);
node_hp.ProtectSafe(root_);
if (IsAborted()) return;
Unfreeze(root_, root_operation_);
for (size_t i = 0; i < num_old_nodes_; ++i) {
node_hp.ProtectSafe(old_nodes_[i]);
if (IsAborted()) return;
Unfreeze(old_nodes_[i], old_operations_[i]);
}
SwitchState(STATE_ROLLBACK, STATE_ABORTED);
}
void Unfreeze(Node* node, Operation* operation) {
Operation* expected = this;
node->GetOperation().CompareAndSwap(expected, operation);
}
bool SwitchState(State old_state, State new_state) {
if (state_ != new_state) {
bool switched = state_.CompareAndSwap(old_state, new_state);
if (!switched && old_state != new_state) {
return false;
}
}
return true;
}
ChromaticTreeOperation(const ChromaticTreeOperation&);
ChromaticTreeOperation& operator=(const ChromaticTreeOperation&);
AtomicState state_;
Node* root_;
Operation* root_operation_;
size_t num_old_nodes_;
Node* old_nodes_[MAX_NODES];
Operation* old_operations_[MAX_NODES];
Node* new_child_;
#ifdef EMBB_DEBUG
embb::base::Atomic<bool> deleted_;
#endif
};
template<typename Key, typename Value>
typename ChromaticTreeOperation<Key, Value>::Operation* const
ChromaticTreeOperation<Key, Value>::INITIAL_DUMMY =
ChromaticTreeOperation::GetInitialDummmy();
template<typename Key, typename Value>
typename ChromaticTreeOperation<Key, Value>::Operation* const
ChromaticTreeOperation<Key, Value>::RETIRED_DUMMY =
ChromaticTreeOperation::GetRetiredDummmy();
} // namespace internal } // namespace internal
namespace test { namespace test {
...@@ -329,28 +633,34 @@ class ChromaticTree { ...@@ -329,28 +633,34 @@ class ChromaticTree {
typedef embb::base::Atomic<Node*> AtomicNodePtr; typedef embb::base::Atomic<Node*> AtomicNodePtr;
/** Typedef for an pointer to a node protected by a Hazard Pointer. */ /** Typedef for an pointer to a node protected by a Hazard Pointer. */
typedef internal::UniqueHazardPointer<Node> HazardNodePtr; typedef internal::UniqueHazardPointer<Node> HazardNodePtr;
/** Typedef for the chromatic tree operation. */
typedef internal::ChromaticTreeOperation<Key, Value> Operation;
/** Typedef for an atomic pointer to a tree operation. */
typedef embb::base::Atomic<Operation*> AtomicOperationPtr;
/** Typedef for an pointer to a node protected by a Hazard Pointer. */
typedef internal::UniqueHazardPointer<Operation> HazardOperationPtr;
/** Typedef for the UniqueLock class. */ /** Typedef for the UniqueLock class. */
typedef embb::base::UniqueLock<embb::base::Mutex> UniqueLock; typedef embb::base::UniqueLock<embb::base::Mutex> UniqueLock;
/** Typedef for an object pool for tree nodes. */ /** Typedef for an object pool for tree nodes. */
typedef ObjectPool<Node, ValuePool> NodePool; typedef ObjectPool<Node, ValuePool> NodePool;
/** Typedef for an object pool for tree operations. */
typedef ObjectPool<Operation, ValuePool> OperationPool;
typedef enum {
HIDX_HELPING = 0,
HIDX_LEAF,
HIDX_PARENT,
HIDX_GRANDPARENT,
HIDX_SIBLING,
HIDX_CURRENT_OP,
HIDX_MAX
} HazardIndex;
/** /**
* Follows a path from the root of the tree to some leaf searching for the * Follows a path from the root of the tree to some leaf searching for the
* given key (the leaf found by this method may or may not contain the given * given key (the leaf found by this method may or may not contain the given
* key). Returns the reached leaf together with its ancestors. * key). Returns the reached leaf together with its ancestors.
* *
* \param[IN] key Key to be searched for
* \param[IN,OUT] leaf Reference to the reached leaf
* \param[IN,OUT] parent Reference to the parent of the reached leaf
*/
void Search(const Key& key, HazardNodePtr& leaf, HazardNodePtr& parent);
/**
* Follows a path from the root of the tree to some leaf searching for the
* given key (the leaf found by this method may or may not contain the given
* key). Returns the reached leaf together with its ancestors.
*
* \param[IN] key Key to be searched for * \param[IN] key Key to be searched for
* \param[IN,OUT] leaf Reference to the reached leaf * \param[IN,OUT] leaf Reference to the reached leaf
* \param[IN,OUT] parent Reference to the parent of the reached leaf * \param[IN,OUT] parent Reference to the parent of the reached leaf
...@@ -427,13 +737,15 @@ class ChromaticTree { ...@@ -427,13 +737,15 @@ class ChromaticTree {
*/ */
bool IsBalanced(const Node* node) const; bool IsBalanced(const Node* node) const;
/** void RetireNode(HazardNodePtr& node);
* Free a tree node using the Hazard Pointers memory reclamation routines.
* void RetireOperation(HazardOperationPtr& operation);
* \param[IN] node A node to be freed.
* \param[IN] node_lock A lock holding the mutex of the \c node to be freed. AtomicNodePtr& GetNodeGuard(HazardIndex index);
*/
void RetireHazardousNode(HazardNodePtr& node, UniqueLock& node_lock); AtomicOperationPtr& GetOperationGuard(HazardIndex index);
bool WeakLLX(HazardNodePtr& node, HazardOperationPtr& operation);
/** /**
* Free a tree node by returning it to the node pool. * Free a tree node by returning it to the node pool.
...@@ -442,6 +754,8 @@ class ChromaticTree { ...@@ -442,6 +754,8 @@ class ChromaticTree {
*/ */
void FreeNode(Node* node); void FreeNode(Node* node);
void FreeOperation(Operation* operation);
/** /**
* Follows the path from the root to some leaf (directed by the given key) and * Follows the path from the root to some leaf (directed by the given key) and
* checks for any tree balancing violations. If a violation is found, tries * checks for any tree balancing violations. If a violation is found, tries
...@@ -480,12 +794,14 @@ class ChromaticTree { ...@@ -480,12 +794,14 @@ class ChromaticTree {
/** Hazard pointer instance for protection of node pointers */ /** Hazard pointer instance for protection of node pointers */
internal::HazardPointer<Node*> node_hazard_manager_; internal::HazardPointer<Node*> node_hazard_manager_;
internal::HazardPointer<Operation*> operation_hazard_manager_;
const Key undefined_key_; /**< A dummy key used by the tree */ const Key undefined_key_; /**< A dummy key used by the tree */
const Value undefined_value_; /**< A dummy value used by the tree */ const Value undefined_value_; /**< A dummy value used by the tree */
const Compare compare_; /**< Comparator object for the keys */ const Compare compare_; /**< Comparator object for the keys */
size_t capacity_; /**< User-requested capacity of the tree */ size_t capacity_; /**< User-requested capacity of the tree */
NodePool node_pool_; /**< Comparator object for the keys */ NodePool node_pool_; /**< Pool of tree nodes */
OperationPool operation_pool_; /**< Pool of operation objects */
Node* const entry_; /**< Pointer to the sentinel node used as Node* const entry_; /**< Pointer to the sentinel node used as
* the entry point into the tree */ * the entry point into the tree */
......
...@@ -185,6 +185,12 @@ class ObjectPool { ...@@ -185,6 +185,12 @@ class ObjectPool {
Type* Allocate(Param1 const& param1, Param2 const& param2, Type* Allocate(Param1 const& param1, Param2 const& param2,
Param3 const& param3, Param4 const& param4, Param5 const& param5); Param3 const& param3, Param4 const& param4, Param5 const& param5);
template<typename Param1, typename Param2, typename Param3, typename Param4,
typename Param5, typename Param6>
Type* Allocate(Param1 const& param1, Param2 const& param2,
Param3 const& param3, Param4 const& param4, Param5 const& param5,
Param6 const& param6);
#endif #endif
}; };
} // namespace containers } // namespace containers
......
...@@ -66,11 +66,11 @@ TreeTest<Tree>::TreeTest() ...@@ -66,11 +66,11 @@ TreeTest<Tree>::TreeTest()
Add(&TreeTest::TreeTestConcurrentGet_ReaderMethod, this, Add(&TreeTest::TreeTestConcurrentGet_ReaderMethod, this,
NUM_TEST_THREADS / 2, NUM_ITERATIONS). NUM_TEST_THREADS / 2, NUM_ITERATIONS).
Post(&TreeTest::TreeTestConcurrentGet_Post, this); Post(&TreeTest::TreeTestConcurrentGet_Post, this);
CreateUnit("TreeTestBalance"). // CreateUnit("TreeTestBalance").
Pre(&TreeTest::TreeTestBalance_Pre, this). // Pre(&TreeTest::TreeTestBalance_Pre, this).
Add(&TreeTest::TreeTestBalance_ThreadMethod, this, // Add(&TreeTest::TreeTestBalance_ThreadMethod, this,
NUM_TEST_THREADS, 1). // NUM_TEST_THREADS, 1).
Post(&TreeTest::TreeTestBalance_Post, this); // Post(&TreeTest::TreeTestBalance_Post, this);
} }
template<typename Tree> template<typename Tree>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment