aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-02-26 20:47:55 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-02-26 20:57:42 +0300
commit4ad6c196bc757b64229680c3bd4db544a3fc8fdd (patch)
tree54c033116f65039bb6f77def6e9a0d30a4b6f31d /yt
parent2078655164abf61e9f06a470c78d83bb36372eb7 (diff)
downloadydb-4ad6c196bc757b64229680c3bd4db544a3fc8fdd.tar.gz
Intermediate changes
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/core/misc/atomic_ptr-inl.h8
-rw-r--r--yt/yt/core/misc/atomic_ptr.h4
-rw-r--r--yt/yt/core/misc/concurrent_cache-inl.h17
-rw-r--r--yt/yt/core/misc/concurrent_cache.h2
-rw-r--r--yt/yt/core/misc/lock_free_hash_table-inl.h19
-rw-r--r--yt/yt/core/misc/lock_free_hash_table.h26
-rw-r--r--yt/yt/core/misc/unittests/concurrent_cache_ut.cpp2
-rw-r--r--yt/yt/core/misc/unittests/lock_free_hash_table_ut.cpp2
8 files changed, 53 insertions, 27 deletions
diff --git a/yt/yt/core/misc/atomic_ptr-inl.h b/yt/yt/core/misc/atomic_ptr-inl.h
index 6d57b1e4c7..021c82318b 100644
--- a/yt/yt/core/misc/atomic_ptr-inl.h
+++ b/yt/yt/core/misc/atomic_ptr-inl.h
@@ -204,15 +204,15 @@ TAtomicPtr<T, EnableAcquireHazard>::operator bool() const
////////////////////////////////////////////////////////////////////////////////
template <class T, bool EnableAcquireHazard>
-bool operator==(const TAtomicPtr<T, EnableAcquireHazard>& lhs, const TIntrusivePtr<T>& rhs)
+bool operator==(const TAtomicPtr<T, EnableAcquireHazard>& lhs, const T* rhs)
{
- return lhs.Ptr_.load() == rhs.Get();
+ return lhs.Ptr_.load() == rhs;
}
template <class T, bool EnableAcquireHazard>
-bool operator==(const TIntrusivePtr<T>& lhs, const TAtomicPtr<T, EnableAcquireHazard>& rhs)
+bool operator==(const T* lhs, const TAtomicPtr<T, EnableAcquireHazard>& rhs)
{
- return lhs.Get() == rhs.Ptr_.load();
+ return lhs == rhs.Ptr_.load();
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/misc/atomic_ptr.h b/yt/yt/core/misc/atomic_ptr.h
index fe15898273..0fd38487b5 100644
--- a/yt/yt/core/misc/atomic_ptr.h
+++ b/yt/yt/core/misc/atomic_ptr.h
@@ -57,10 +57,10 @@ private:
explicit TAtomicPtr(T* ptr);
template <class T_, bool EnableAcquireHazard_>
- friend bool operator==(const TAtomicPtr<T_, EnableAcquireHazard_>& lhs, const TIntrusivePtr<T_>& rhs);
+ friend bool operator==(const TAtomicPtr<T_, EnableAcquireHazard_>& lhs, const T_* rhs);
template <class T_, bool EnableAcquireHazard_>
- friend bool operator==(const TIntrusivePtr<T_>& lhs, const TAtomicPtr<T_, EnableAcquireHazard_>& rhs);
+ friend bool operator==(const T_* lhs, const TAtomicPtr<T_, EnableAcquireHazard_>& rhs);
std::atomic<T*> Ptr_ = nullptr;
diff --git a/yt/yt/core/misc/concurrent_cache-inl.h b/yt/yt/core/misc/concurrent_cache-inl.h
index f6ca30b6ab..8fe3021148 100644
--- a/yt/yt/core/misc/concurrent_cache-inl.h
+++ b/yt/yt/core/misc/concurrent_cache-inl.h
@@ -25,14 +25,15 @@ struct TConcurrentCache<T>::TLookupTable final
, Capacity(capacity)
{ }
- bool Insert(TValuePtr item)
+ typename THashTable::TItemRef Insert(TValuePtr item)
{
auto fingerprint = THash<T>()(item.Get());
- if (THashTable::Insert(fingerprint, std::move(item))) {
+ auto result = THashTable::Insert(fingerprint, std::move(item));
+
+ if (result) {
++Size;
- return true;
}
- return false;
+ return result;
}
};
@@ -40,7 +41,7 @@ template <class T>
TIntrusivePtr<typename TConcurrentCache<T>::TLookupTable>
TConcurrentCache<T>::RenewTable(const TIntrusivePtr<TLookupTable>& head, size_t capacity)
{
- if (head != Head_) {
+ if (head.Get() != Head_) {
return Head_.Acquire();
}
@@ -207,6 +208,12 @@ void TConcurrentCache<T>::SetCapacity(size_t capacity)
}
}
+template <class T>
+bool TConcurrentCache<T>::IsHead(const TIntrusivePtr<TLookupTable>& head) const
+{
+ return Head_ == head.Get();
+}
+
/////////////////////////////////////////////////////////////////////////////
} // namespace NYT
diff --git a/yt/yt/core/misc/concurrent_cache.h b/yt/yt/core/misc/concurrent_cache.h
index 158a358eef..7f0f52054e 100644
--- a/yt/yt/core/misc/concurrent_cache.h
+++ b/yt/yt/core/misc/concurrent_cache.h
@@ -91,6 +91,8 @@ public:
void SetCapacity(size_t capacity);
+ bool IsHead(const TIntrusivePtr<TLookupTable>& head) const;
+
private:
std::atomic<size_t> Capacity_;
TAtomicPtr<TLookupTable> Head_;
diff --git a/yt/yt/core/misc/lock_free_hash_table-inl.h b/yt/yt/core/misc/lock_free_hash_table-inl.h
index 4f36d33831..09d04035fd 100644
--- a/yt/yt/core/misc/lock_free_hash_table-inl.h
+++ b/yt/yt/core/misc/lock_free_hash_table-inl.h
@@ -49,8 +49,10 @@ size_t TLockFreeHashTable<T>::GetByteSize() const
}
template <class T>
-bool TLockFreeHashTable<T>::Insert(TFingerprint fingerprint, TValuePtr value)
+typename TLockFreeHashTable<T>::TItemRef TLockFreeHashTable<T>::Insert(TFingerprint fingerprint, TValuePtr value)
{
+ using TItemRef = typename TLockFreeHashTable<T>::TItemRef;
+
auto index = IndexFromFingerprint(fingerprint) % Size_;
auto stamp = StampFromFingerprint(fingerprint);
@@ -68,7 +70,7 @@ bool TLockFreeHashTable<T>::Insert(TFingerprint fingerprint, TValuePtr value)
std::memory_order::acquire);
if (success) {
value.Release();
- return true;
+ return TItemRef(&HashTable_[index]);
}
}
@@ -78,7 +80,7 @@ bool TLockFreeHashTable<T>::Insert(TFingerprint fingerprint, TValuePtr value)
}, ValueFromEntry(tableEntry));
if (TEqualTo<T>()(item.Get(), value.Get())) {
- return false;
+ return TItemRef(nullptr);
}
++index;
@@ -88,7 +90,7 @@ bool TLockFreeHashTable<T>::Insert(TFingerprint fingerprint, TValuePtr value)
--probeCount;
}
- return false;
+ return TItemRef(nullptr);
}
template <class T>
@@ -140,6 +142,7 @@ typename TLockFreeHashTable<T>::TItemRef TLockFreeHashTable<T>::FindRef(TFingerp
for (size_t probeCount = Size_; probeCount != 0;) {
auto tableEntry = HashTable_[index].load(std::memory_order::relaxed);
+ // TODO(lukyan): Rename to entryStamp.
auto tableStamp = StampFromEntry(tableEntry);
if (tableStamp == 0) {
@@ -179,16 +182,18 @@ typename TLockFreeHashTable<T>::TStamp
template <class T>
T* TLockFreeHashTable<T>::ValueFromEntry(TEntry entry)
{
- return reinterpret_cast<T*>(entry & ((1ULL << ValueLog) - 1));
+ constexpr auto Mask = (1ULL << ValueLog) - 1;
+ return reinterpret_cast<T*>(entry & (Mask ^ 1ULL));
}
template <class T>
typename TLockFreeHashTable<T>::TEntry
- TLockFreeHashTable<T>::MakeEntry(TStamp stamp, T* value)
+ TLockFreeHashTable<T>::MakeEntry(TStamp stamp, T* value, bool sealed)
{
YT_ASSERT(stamp != 0);
+ YT_ASSERT((reinterpret_cast<TEntry>(value) & 1ULL) == 0);
YT_ASSERT(StampFromEntry(reinterpret_cast<TEntry>(value)) == 0);
- return (static_cast<TEntry>(stamp) << ValueLog) | reinterpret_cast<TEntry>(value);
+ return (static_cast<TEntry>(stamp) << ValueLog) | reinterpret_cast<TEntry>(value) | (sealed ? 1ULL : 0ULL);
}
template <class T>
diff --git a/yt/yt/core/misc/lock_free_hash_table.h b/yt/yt/core/misc/lock_free_hash_table.h
index b837dc1811..78d7482261 100644
--- a/yt/yt/core/misc/lock_free_hash_table.h
+++ b/yt/yt/core/misc/lock_free_hash_table.h
@@ -51,20 +51,20 @@ public:
return TValuePtr(item.Get());
}
- //! Updates existing element.
- void Update(TValuePtr value)
+ //! Replace existing element.
+ void Replace(TValuePtr value, bool sealed = true)
{
// Fingerprint must be equal.
auto stamp = StampFromEntry(Entry_->load(std::memory_order::acquire));
- auto entry = MakeEntry(stamp, value.Release());
+ auto entry = MakeEntry(stamp, value.Release(), sealed);
// TODO(lukyan): Keep dereferenced value and update via CAS.
auto oldEntry = Entry_->exchange(entry);
DeleteEntry(oldEntry);
}
- bool Update(TValuePtr value, const T* expected)
+ bool Replace(TValuePtr value, const T* expected, bool sealed = true)
{
if (value.Get() == expected) {
return false;
@@ -78,7 +78,7 @@ public:
return false;
}
- auto entry = MakeEntry(stamp, value.Get());
+ auto entry = MakeEntry(stamp, value.Get(), sealed);
if (!Entry_->compare_exchange_strong(currentEntry, entry)) {
return false;
@@ -89,6 +89,18 @@ public:
return true;
}
+ void SealItem()
+ {
+ auto oldValue = Entry_->fetch_add(1);
+ YT_VERIFY(!(oldValue & 1));
+ }
+
+ bool IsSealed() const
+ {
+ auto currentEntry = Entry_->load(std::memory_order::acquire);
+ return currentEntry & 1ULL;
+ }
+
private:
std::atomic<TEntry>* Entry_ = nullptr;
@@ -107,7 +119,7 @@ public:
size_t GetByteSize() const;
//! Inserts element. Called concurrently from multiple threads.
- bool Insert(TFingerprint fingerprint, TValuePtr value);
+ typename TLockFreeHashTable<T>::TItemRef Insert(TFingerprint fingerprint, TValuePtr value);
template <class TKey>
TIntrusivePtr<T> Find(TFingerprint fingerprint, const TKey& key);
@@ -126,7 +138,7 @@ private:
static T* ValueFromEntry(TEntry entry);
- static TEntry MakeEntry(TStamp stamp, T* value);
+ static TEntry MakeEntry(TStamp stamp, T* value, bool sealed = false);
static size_t IndexFromFingerprint(TFingerprint fingerprint);
diff --git a/yt/yt/core/misc/unittests/concurrent_cache_ut.cpp b/yt/yt/core/misc/unittests/concurrent_cache_ut.cpp
index af1e1ceef4..26dc973c8e 100644
--- a/yt/yt/core/misc/unittests/concurrent_cache_ut.cpp
+++ b/yt/yt/core/misc/unittests/concurrent_cache_ut.cpp
@@ -112,7 +112,7 @@ TEST_P(TConcurrentCacheTest, Stress)
if (!foundRef) {
auto value = NewWithExtraSpace<TElement>(&allocator, columnCount);
memcpy(value.Get(), key, sizeof(TElement) + columnCount);
- bool inserted = inserter.GetTable()->Insert(std::move(value));
+ auto inserted = static_cast<bool>(inserter.GetTable()->Insert(std::move(value)));
insertCount += inserted;
} else if (reinsert) {
diff --git a/yt/yt/core/misc/unittests/lock_free_hash_table_ut.cpp b/yt/yt/core/misc/unittests/lock_free_hash_table_ut.cpp
index 6c196f1bce..6ff4d148c0 100644
--- a/yt/yt/core/misc/unittests/lock_free_hash_table_ut.cpp
+++ b/yt/yt/core/misc/unittests/lock_free_hash_table_ut.cpp
@@ -121,7 +121,7 @@ TEST(TLockFreeHashTableTest, Simple)
auto fingerprint = hash(item.Get());
auto foundRef = table.FindRef(fingerprint, item.Get());
EXPECT_TRUE(static_cast<bool>(foundRef));
- foundRef.Update(item);
+ foundRef.Replace(item);
}
for (const auto& item : checkTable) {