diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-02-26 20:47:55 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-02-26 20:57:42 +0300 |
commit | 4ad6c196bc757b64229680c3bd4db544a3fc8fdd (patch) | |
tree | 54c033116f65039bb6f77def6e9a0d30a4b6f31d /yt | |
parent | 2078655164abf61e9f06a470c78d83bb36372eb7 (diff) | |
download | ydb-4ad6c196bc757b64229680c3bd4db544a3fc8fdd.tar.gz |
Intermediate changes
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/core/misc/atomic_ptr-inl.h | 8 | ||||
-rw-r--r-- | yt/yt/core/misc/atomic_ptr.h | 4 | ||||
-rw-r--r-- | yt/yt/core/misc/concurrent_cache-inl.h | 17 | ||||
-rw-r--r-- | yt/yt/core/misc/concurrent_cache.h | 2 | ||||
-rw-r--r-- | yt/yt/core/misc/lock_free_hash_table-inl.h | 19 | ||||
-rw-r--r-- | yt/yt/core/misc/lock_free_hash_table.h | 26 | ||||
-rw-r--r-- | yt/yt/core/misc/unittests/concurrent_cache_ut.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/misc/unittests/lock_free_hash_table_ut.cpp | 2 |
8 files changed, 53 insertions, 27 deletions
diff --git a/yt/yt/core/misc/atomic_ptr-inl.h b/yt/yt/core/misc/atomic_ptr-inl.h index 6d57b1e4c7..021c82318b 100644 --- a/yt/yt/core/misc/atomic_ptr-inl.h +++ b/yt/yt/core/misc/atomic_ptr-inl.h @@ -204,15 +204,15 @@ TAtomicPtr<T, EnableAcquireHazard>::operator bool() const //////////////////////////////////////////////////////////////////////////////// template <class T, bool EnableAcquireHazard> -bool operator==(const TAtomicPtr<T, EnableAcquireHazard>& lhs, const TIntrusivePtr<T>& rhs) +bool operator==(const TAtomicPtr<T, EnableAcquireHazard>& lhs, const T* rhs) { - return lhs.Ptr_.load() == rhs.Get(); + return lhs.Ptr_.load() == rhs; } template <class T, bool EnableAcquireHazard> -bool operator==(const TIntrusivePtr<T>& lhs, const TAtomicPtr<T, EnableAcquireHazard>& rhs) +bool operator==(const T* lhs, const TAtomicPtr<T, EnableAcquireHazard>& rhs) { - return lhs.Get() == rhs.Ptr_.load(); + return lhs == rhs.Ptr_.load(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/misc/atomic_ptr.h b/yt/yt/core/misc/atomic_ptr.h index fe15898273..0fd38487b5 100644 --- a/yt/yt/core/misc/atomic_ptr.h +++ b/yt/yt/core/misc/atomic_ptr.h @@ -57,10 +57,10 @@ private: explicit TAtomicPtr(T* ptr); template <class T_, bool EnableAcquireHazard_> - friend bool operator==(const TAtomicPtr<T_, EnableAcquireHazard_>& lhs, const TIntrusivePtr<T_>& rhs); + friend bool operator==(const TAtomicPtr<T_, EnableAcquireHazard_>& lhs, const T_* rhs); template <class T_, bool EnableAcquireHazard_> - friend bool operator==(const TIntrusivePtr<T_>& lhs, const TAtomicPtr<T_, EnableAcquireHazard_>& rhs); + friend bool operator==(const T_* lhs, const TAtomicPtr<T_, EnableAcquireHazard_>& rhs); std::atomic<T*> Ptr_ = nullptr; diff --git a/yt/yt/core/misc/concurrent_cache-inl.h b/yt/yt/core/misc/concurrent_cache-inl.h index f6ca30b6ab..8fe3021148 100644 --- a/yt/yt/core/misc/concurrent_cache-inl.h +++ b/yt/yt/core/misc/concurrent_cache-inl.h @@ -25,14 +25,15 @@ struct TConcurrentCache<T>::TLookupTable final , Capacity(capacity) { } - bool Insert(TValuePtr item) + typename THashTable::TItemRef Insert(TValuePtr item) { auto fingerprint = THash<T>()(item.Get()); - if (THashTable::Insert(fingerprint, std::move(item))) { + auto result = THashTable::Insert(fingerprint, std::move(item)); + + if (result) { ++Size; - return true; } - return false; + return result; } }; @@ -40,7 +41,7 @@ template <class T> TIntrusivePtr<typename TConcurrentCache<T>::TLookupTable> TConcurrentCache<T>::RenewTable(const TIntrusivePtr<TLookupTable>& head, size_t capacity) { - if (head != Head_) { + if (head.Get() != Head_) { return Head_.Acquire(); } @@ -207,6 +208,12 @@ void TConcurrentCache<T>::SetCapacity(size_t capacity) } } +template <class T> +bool TConcurrentCache<T>::IsHead(const TIntrusivePtr<TLookupTable>& head) const +{ + return Head_ == head.Get(); +} + ///////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/yt/core/misc/concurrent_cache.h b/yt/yt/core/misc/concurrent_cache.h index 158a358eef..7f0f52054e 100644 --- a/yt/yt/core/misc/concurrent_cache.h +++ b/yt/yt/core/misc/concurrent_cache.h @@ -91,6 +91,8 @@ public: void SetCapacity(size_t capacity); + bool IsHead(const TIntrusivePtr<TLookupTable>& head) const; + private: std::atomic<size_t> Capacity_; TAtomicPtr<TLookupTable> Head_; diff --git a/yt/yt/core/misc/lock_free_hash_table-inl.h b/yt/yt/core/misc/lock_free_hash_table-inl.h index 4f36d33831..09d04035fd 100644 --- a/yt/yt/core/misc/lock_free_hash_table-inl.h +++ b/yt/yt/core/misc/lock_free_hash_table-inl.h @@ -49,8 +49,10 @@ size_t TLockFreeHashTable<T>::GetByteSize() const } template <class T> -bool TLockFreeHashTable<T>::Insert(TFingerprint fingerprint, TValuePtr value) +typename TLockFreeHashTable<T>::TItemRef TLockFreeHashTable<T>::Insert(TFingerprint fingerprint, TValuePtr value) { + using TItemRef = typename TLockFreeHashTable<T>::TItemRef; + auto index = IndexFromFingerprint(fingerprint) % Size_; auto stamp = StampFromFingerprint(fingerprint); @@ -68,7 +70,7 @@ bool TLockFreeHashTable<T>::Insert(TFingerprint fingerprint, TValuePtr value) std::memory_order::acquire); if (success) { value.Release(); - return true; + return TItemRef(&HashTable_[index]); } } @@ -78,7 +80,7 @@ bool TLockFreeHashTable<T>::Insert(TFingerprint fingerprint, TValuePtr value) }, ValueFromEntry(tableEntry)); if (TEqualTo<T>()(item.Get(), value.Get())) { - return false; + return TItemRef(nullptr); } ++index; @@ -88,7 +90,7 @@ bool TLockFreeHashTable<T>::Insert(TFingerprint fingerprint, TValuePtr value) --probeCount; } - return false; + return TItemRef(nullptr); } template <class T> @@ -140,6 +142,7 @@ typename TLockFreeHashTable<T>::TItemRef TLockFreeHashTable<T>::FindRef(TFingerp for (size_t probeCount = Size_; probeCount != 0;) { auto tableEntry = HashTable_[index].load(std::memory_order::relaxed); + // TODO(lukyan): Rename to entryStamp. auto tableStamp = StampFromEntry(tableEntry); if (tableStamp == 0) { @@ -179,16 +182,18 @@ typename TLockFreeHashTable<T>::TStamp template <class T> T* TLockFreeHashTable<T>::ValueFromEntry(TEntry entry) { - return reinterpret_cast<T*>(entry & ((1ULL << ValueLog) - 1)); + constexpr auto Mask = (1ULL << ValueLog) - 1; + return reinterpret_cast<T*>(entry & (Mask ^ 1ULL)); } template <class T> typename TLockFreeHashTable<T>::TEntry - TLockFreeHashTable<T>::MakeEntry(TStamp stamp, T* value) + TLockFreeHashTable<T>::MakeEntry(TStamp stamp, T* value, bool sealed) { YT_ASSERT(stamp != 0); + YT_ASSERT((reinterpret_cast<TEntry>(value) & 1ULL) == 0); YT_ASSERT(StampFromEntry(reinterpret_cast<TEntry>(value)) == 0); - return (static_cast<TEntry>(stamp) << ValueLog) | reinterpret_cast<TEntry>(value); + return (static_cast<TEntry>(stamp) << ValueLog) | reinterpret_cast<TEntry>(value) | (sealed ? 1ULL : 0ULL); } template <class T> diff --git a/yt/yt/core/misc/lock_free_hash_table.h b/yt/yt/core/misc/lock_free_hash_table.h index b837dc1811..78d7482261 100644 --- a/yt/yt/core/misc/lock_free_hash_table.h +++ b/yt/yt/core/misc/lock_free_hash_table.h @@ -51,20 +51,20 @@ public: return TValuePtr(item.Get()); } - //! Updates existing element. - void Update(TValuePtr value) + //! Replace existing element. + void Replace(TValuePtr value, bool sealed = true) { // Fingerprint must be equal. auto stamp = StampFromEntry(Entry_->load(std::memory_order::acquire)); - auto entry = MakeEntry(stamp, value.Release()); + auto entry = MakeEntry(stamp, value.Release(), sealed); // TODO(lukyan): Keep dereferenced value and update via CAS. auto oldEntry = Entry_->exchange(entry); DeleteEntry(oldEntry); } - bool Update(TValuePtr value, const T* expected) + bool Replace(TValuePtr value, const T* expected, bool sealed = true) { if (value.Get() == expected) { return false; @@ -78,7 +78,7 @@ public: return false; } - auto entry = MakeEntry(stamp, value.Get()); + auto entry = MakeEntry(stamp, value.Get(), sealed); if (!Entry_->compare_exchange_strong(currentEntry, entry)) { return false; @@ -89,6 +89,18 @@ public: return true; } + void SealItem() + { + auto oldValue = Entry_->fetch_add(1); + YT_VERIFY(!(oldValue & 1)); + } + + bool IsSealed() const + { + auto currentEntry = Entry_->load(std::memory_order::acquire); + return currentEntry & 1ULL; + } + private: std::atomic<TEntry>* Entry_ = nullptr; @@ -107,7 +119,7 @@ public: size_t GetByteSize() const; //! Inserts element. Called concurrently from multiple threads. - bool Insert(TFingerprint fingerprint, TValuePtr value); + typename TLockFreeHashTable<T>::TItemRef Insert(TFingerprint fingerprint, TValuePtr value); template <class TKey> TIntrusivePtr<T> Find(TFingerprint fingerprint, const TKey& key); @@ -126,7 +138,7 @@ private: static T* ValueFromEntry(TEntry entry); - static TEntry MakeEntry(TStamp stamp, T* value); + static TEntry MakeEntry(TStamp stamp, T* value, bool sealed = false); static size_t IndexFromFingerprint(TFingerprint fingerprint); diff --git a/yt/yt/core/misc/unittests/concurrent_cache_ut.cpp b/yt/yt/core/misc/unittests/concurrent_cache_ut.cpp index af1e1ceef4..26dc973c8e 100644 --- a/yt/yt/core/misc/unittests/concurrent_cache_ut.cpp +++ b/yt/yt/core/misc/unittests/concurrent_cache_ut.cpp @@ -112,7 +112,7 @@ TEST_P(TConcurrentCacheTest, Stress) if (!foundRef) { auto value = NewWithExtraSpace<TElement>(&allocator, columnCount); memcpy(value.Get(), key, sizeof(TElement) + columnCount); - bool inserted = inserter.GetTable()->Insert(std::move(value)); + auto inserted = static_cast<bool>(inserter.GetTable()->Insert(std::move(value))); insertCount += inserted; } else if (reinsert) { diff --git a/yt/yt/core/misc/unittests/lock_free_hash_table_ut.cpp b/yt/yt/core/misc/unittests/lock_free_hash_table_ut.cpp index 6c196f1bce..6ff4d148c0 100644 --- a/yt/yt/core/misc/unittests/lock_free_hash_table_ut.cpp +++ b/yt/yt/core/misc/unittests/lock_free_hash_table_ut.cpp @@ -121,7 +121,7 @@ TEST(TLockFreeHashTableTest, Simple) auto fingerprint = hash(item.Get()); auto foundRef = table.FindRef(fingerprint, item.Get()); EXPECT_TRUE(static_cast<bool>(foundRef)); - foundRef.Update(item); + foundRef.Replace(item); } for (const auto& item : checkTable) { |