diff options
author | aleexfi <aleexfi@yandex-team.com> | 2023-03-17 18:24:11 +0300 |
---|---|---|
committer | aleexfi <aleexfi@yandex-team.com> | 2023-03-17 18:24:11 +0300 |
commit | 7825c9057d3fad670eadd60509802152127d6e49 (patch) | |
tree | 465946e4c2efd91fda1b5806ec3c7fe760d3f6ca | |
parent | 9041b256167b6c37a6d99acdbf5f606f7f47bb02 (diff) | |
download | ydb-7825c9057d3fad670eadd60509802152127d6e49.tar.gz |
YT-17689: Move TFreeList to library/cpp/yt/memory
-rw-r--r-- | library/cpp/yt/memory/free_list-inl.h | 222 | ||||
-rw-r--r-- | library/cpp/yt/memory/free_list.h | 78 | ||||
-rw-r--r-- | library/cpp/yt/memory/public.h | 3 | ||||
-rw-r--r-- | library/cpp/yt/memory/unittests/free_list_ut.cpp | 26 | ||||
-rw-r--r-- | library/cpp/yt/memory/unittests/ya.make | 1 | ||||
-rw-r--r-- | library/cpp/yt/threading/public.h | 4 | ||||
-rw-r--r-- | library/cpp/yt/threading/rw_spin_lock.h | 3 | ||||
-rw-r--r-- | library/cpp/yt/threading/spin_lock.h | 3 | ||||
-rw-r--r-- | library/cpp/ytalloc/impl/core-inl.h | 96 |
9 files changed, 342 insertions, 94 deletions
diff --git a/library/cpp/yt/memory/free_list-inl.h b/library/cpp/yt/memory/free_list-inl.h new file mode 100644 index 0000000000..45bb2b8f10 --- /dev/null +++ b/library/cpp/yt/memory/free_list-inl.h @@ -0,0 +1,222 @@ +#ifndef FREE_LIST_INL_H_ +#error "Direct inclusion of this file is not allowed, include free_list.h" +// For the sake of sane code completion. +#include "free_list.h" +#endif + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +namespace NDetail { + +template <class T1, class T2> +Y_FORCE_INLINE bool CompareAndSet( + TAtomicUint128* atomic, + T1& expected1, + T2& expected2, + T1 new1, + T2 new2) +{ +#if defined(__x86_64__) + bool success; + __asm__ __volatile__ + ( + "lock cmpxchg16b %1\n" + "setz %0" + : "=q"(success) + , "+m"(*atomic) + , "+a"(expected1) + , "+d"(expected2) + : "b"(new1) + , "c"(new2) + : "cc" + ); + return success; +#elif defined(__arm64__) || (defined(__aarch64__) && defined(RTE_ARM_FEATURE_ATOMICS)) + register ui64 x0 __asm("x0") = (ui64)expected1; + register ui64 x1 __asm("x1") = (ui64)expected2; + register ui64 x2 __asm("x2") = (ui64)new1; + register ui64 x3 __asm("x3") = (ui64)new2; + ui64 old1 = (ui64)expected1; + ui64 old2 = (ui64)expected2; + asm volatile + ( +#if defined(RTE_CC_CLANG) + ".arch armv8-a+lse\n" +#endif + "caspal %[old0], %[old1], %[upd0], %[upd1], [%[dst]]" + : [old0] "+r" (x0) + , [old1] "+r" (x1) + : [upd0] "r" (x2) + , [upd1] "r" (x3) + , [dst] "r" (atomic) + : "memory" + ); + expected1 = (T1)x0; + expected2 = (T2)x1; + return x0 == old1 && x1 == old2; +#elif defined(__aarch64__) + ui64 exp1 = reinterpret_cast<ui64>(expected1); + ui64 exp2 = reinterpret_cast<ui64>(expected2); + ui32 fail = 0; + + do { + ui64 current1 = 0; + ui64 current2 = 0; + asm volatile ( + "ldaxp %[cur1], %[cur2], [%[src]]" + : [cur1] "=r" (current1) + , [cur2] "=r" (current2) + : [src] "r" (atomic) + : "memory" + ); + + if (current1 != exp1 || current2 != exp2) { + expected1 = reinterpret_cast<T1>(current1); + expected2 = reinterpret_cast<T2>(current2); + return false; + } + + asm volatile ( + "stlxp %w[fail], %[new1], %[new2], [%[dst]]" + : [fail] "=&r" (fail) + : [new1] "r" (new1) + , [new2] "r" (new2) + , [dst] "r" (atomic) + : "memory" + ); + + } while (Y_UNLIKELY(fail)); + return true; +#else +# error Unsupported platform +#endif +} + +} // namespace NDetail + +//////////////////////////////////////////////////////////////////////////////// + +template <class TItem> +TFreeList<TItem>::THead::THead(TItem* pointer) + : Pointer(pointer) +{ } + +template <class TItem> +TFreeList<TItem>::TFreeList() + : Head_() +{ } + +template <class TItem> +TFreeList<TItem>::TFreeList(TFreeList<TItem>&& other) + : Head_(other.ExtractAll()) +{ } + +template <class TItem> +TFreeList<TItem>::~TFreeList() +{ + YT_VERIFY(IsEmpty()); +} + +template <class TItem> +template <class TPredicate> +Y_NO_SANITIZE("thread") +bool TFreeList<TItem>::PutIf(TItem* head, TItem* tail, TPredicate predicate) +{ + auto* current = Head_.Pointer.load(std::memory_order::relaxed); + auto popCount = Head_.PopCount.load(std::memory_order::relaxed); + + while (predicate(current)) { + tail->Next.store(current, std::memory_order::release); + if (NYT::NDetail::CompareAndSet(&AtomicHead_, current, popCount, head, popCount)) { + return true; + } + } + + tail->Next.store(nullptr, std::memory_order::release); + + return false; +} + + +template <class TItem> +Y_NO_SANITIZE("thread") +void TFreeList<TItem>::Put(TItem* head, TItem* tail) +{ + auto* current = Head_.Pointer.load(std::memory_order::relaxed); + auto popCount = Head_.PopCount.load(std::memory_order::relaxed); + + do { + tail->Next.store(current, std::memory_order::release); + } while (!NYT::NDetail::CompareAndSet(&AtomicHead_, current, popCount, head, popCount)); +} + +template <class TItem> +void TFreeList<TItem>::Put(TItem* item) +{ + Put(item, item); +} + +template <class TItem> +Y_NO_SANITIZE("thread") +TItem* TFreeList<TItem>::Extract() +{ + auto* current = Head_.Pointer.load(std::memory_order::relaxed); + auto popCount = Head_.PopCount.load(std::memory_order::relaxed); + + while (current) { + // If current node is already extracted by other thread + // there can be any writes at address ¤t->Next. + // The only guaranteed thing is that address is valid (memory is not freed). + auto next = current->Next.load(std::memory_order::acquire); + if (NYT::NDetail::CompareAndSet(&AtomicHead_, current, popCount, next, popCount + 1)) { + current->Next.store(nullptr, std::memory_order::release); + return current; + } + } + + return nullptr; +} + +template <class TItem> +TItem* TFreeList<TItem>::ExtractAll() +{ + auto* current = Head_.Pointer.load(std::memory_order::relaxed); + auto popCount = Head_.PopCount.load(std::memory_order::relaxed); + + while (current) { + if (NYT::NDetail::CompareAndSet<TItem*, size_t>(&AtomicHead_, current, popCount, nullptr, popCount + 1)) { + return current; + } + } + + return nullptr; +} + +template <class TItem> +bool TFreeList<TItem>::IsEmpty() const +{ + return Head_.Pointer.load() == nullptr; +} + +template <class TItem> +void TFreeList<TItem>::Append(TFreeList<TItem>& other) +{ + auto* head = other.ExtractAll(); + + if (!head) { + return; + } + + auto* tail = head; + while (tail->Next) { + tail = tail->Next; + } + + Put(head, tail); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/free_list.h b/library/cpp/yt/memory/free_list.h new file mode 100644 index 0000000000..670b7d2cc4 --- /dev/null +++ b/library/cpp/yt/memory/free_list.h @@ -0,0 +1,78 @@ +#pragma once + +#include "public.h" + +#include <util/system/compiler.h> + +#include <atomic> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +struct TFreeListItemBase +{ + std::atomic<T*> Next = nullptr; +}; + +// DCAS is supported in Clang with option -mcx16, is not supported in GCC. See following links. +// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=84522 +// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=80878 + +using TAtomicUint128 = volatile unsigned __int128 __attribute__((aligned(16))); + +template <class TItem> +class TFreeList +{ +private: + struct THead + { + std::atomic<TItem*> Pointer = {nullptr}; + std::atomic<size_t> PopCount = 0; + + THead() = default; + + explicit THead(TItem* pointer); + + }; + + union + { + THead Head_; + TAtomicUint128 AtomicHead_; + }; + + // Avoid false sharing. + char Padding[CacheLineSize - sizeof(TAtomicUint128)]; + +public: + TFreeList(); + + TFreeList(TFreeList&& other); + + ~TFreeList(); + + template <class TPredicate> + bool PutIf(TItem* head, TItem* tail, TPredicate predicate); + + void Put(TItem* head, TItem* tail); + + void Put(TItem* item); + + TItem* Extract(); + + TItem* ExtractAll(); + + bool IsEmpty() const; + + void Append(TFreeList& other); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define FREE_LIST_INL_H_ +#include "free_list-inl.h" +#undef FREE_LIST_INL_H_ diff --git a/library/cpp/yt/memory/public.h b/library/cpp/yt/memory/public.h index fb1546dd59..f05a6b4569 100644 --- a/library/cpp/yt/memory/public.h +++ b/library/cpp/yt/memory/public.h @@ -6,6 +6,9 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// +// TODO(babenko): consider increasing to 128 due to cache line pairing in L2 prefetcher. +constexpr size_t CacheLineSize = 64; + class TChunkedMemoryPool; DECLARE_REFCOUNTED_STRUCT(IMemoryChunkProvider) diff --git a/library/cpp/yt/memory/unittests/free_list_ut.cpp b/library/cpp/yt/memory/unittests/free_list_ut.cpp new file mode 100644 index 0000000000..307a5abb7c --- /dev/null +++ b/library/cpp/yt/memory/unittests/free_list_ut.cpp @@ -0,0 +1,26 @@ +#include <library/cpp/testing/gtest/gtest.h> + +#include <library/cpp/yt/memory/free_list.h> + +namespace NYT { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TFreeListTest, CompareAndSet) +{ + TAtomicUint128 v = 0; + ui64 p1 = 0; + ui64 p2 = 0; + EXPECT_TRUE(NYT::NDetail::CompareAndSet(&v, p1, p2, ui64{13}, ui64{9})); + EXPECT_FALSE(NYT::NDetail::CompareAndSet(&v, p1, p2, ui64{100}, ui64{500})); + EXPECT_EQ(13u, p1); + EXPECT_EQ(9u, p2); + EXPECT_TRUE(NYT::NDetail::CompareAndSet(&v, p1, p2, ui64{100}, ui64{500})); + EXPECT_EQ(TAtomicUint128{500} << 64 | 100, v); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT diff --git a/library/cpp/yt/memory/unittests/ya.make b/library/cpp/yt/memory/unittests/ya.make index ff0e639819..684fd1ad26 100644 --- a/library/cpp/yt/memory/unittests/ya.make +++ b/library/cpp/yt/memory/unittests/ya.make @@ -10,6 +10,7 @@ SRCS( atomic_intrusive_ptr_ut.cpp chunked_memory_pool_ut.cpp chunked_memory_pool_output_ut.cpp + free_list_ut.cpp intrusive_ptr_ut.cpp weak_ptr_ut.cpp ) diff --git a/library/cpp/yt/threading/public.h b/library/cpp/yt/threading/public.h index 92d062ac3e..ab6cb22663 100644 --- a/library/cpp/yt/threading/public.h +++ b/library/cpp/yt/threading/public.h @@ -6,13 +6,9 @@ namespace NYT::NThreading { //////////////////////////////////////////////////////////////////////////////// -// TODO(babenko): consider increasing to 128 due to cache line pairing in L2 prefetcher. -constexpr size_t CacheLineSize = 64; - #define YT_DECLARE_SPIN_LOCK(type, name) \ type name{__LOCATION__} //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NThreading - diff --git a/library/cpp/yt/threading/rw_spin_lock.h b/library/cpp/yt/threading/rw_spin_lock.h index 3e834dfcc9..5dfefac686 100644 --- a/library/cpp/yt/threading/rw_spin_lock.h +++ b/library/cpp/yt/threading/rw_spin_lock.h @@ -3,6 +3,8 @@ #include "public.h" #include "spin_lock_base.h" +#include <library/cpp/yt/memory/public.h> + #include <util/system/rwlock.h> #include <atomic> @@ -164,4 +166,3 @@ auto WriterGuard(const T* lock); #define RW_SPIN_LOCK_INL_H_ #include "rw_spin_lock-inl.h" #undef RW_SPIN_LOCK_INL_H_ - diff --git a/library/cpp/yt/threading/spin_lock.h b/library/cpp/yt/threading/spin_lock.h index 28103af1ba..dc3d7a6a59 100644 --- a/library/cpp/yt/threading/spin_lock.h +++ b/library/cpp/yt/threading/spin_lock.h @@ -3,6 +3,8 @@ #include "public.h" #include "spin_lock_base.h" +#include <library/cpp/yt/memory/public.h> + #include <library/cpp/yt/misc/port.h> #include <library/cpp/yt/system/thread_id.h> @@ -78,4 +80,3 @@ private: #define SPIN_LOCK_INL_H_ #include "spin_lock-inl.h" #undef SPIN_LOCK_INL_H_ - diff --git a/library/cpp/ytalloc/impl/core-inl.h b/library/cpp/ytalloc/impl/core-inl.h index e8e5d25442..493a9181c9 100644 --- a/library/cpp/ytalloc/impl/core-inl.h +++ b/library/cpp/ytalloc/impl/core-inl.h @@ -7,6 +7,8 @@ #include <library/cpp/yt/containers/intrusive_linked_list.h> +#include <library/cpp/yt/memory/free_list.h> + #include <library/cpp/yt/threading/at_fork.h> #include <library/cpp/yt/threading/fork_aware_spin_lock.h> @@ -503,89 +505,8 @@ Y_FORCE_INLINE void PoisonUninitializedRange(void* ptr, size_t size) //////////////////////////////////////////////////////////////////////////////// -template <class T> -struct TFreeListItem -{ - T* Next = nullptr; -}; - constexpr size_t CacheLineSize = 64; -// A lock-free stack of items (derived from TFreeListItem). -// Supports multiple producers and multiple consumers. -// Internally uses DCAS with tagged pointers to defeat ABA. -template <class T> -class TFreeList -{ -public: - void Put(T* item) - { - TTaggedPointer currentTaggedHead{}; - TTaggedPointer newTaggedHead; - do { - item->Next = currentTaggedHead.first; - newTaggedHead = std::make_pair(item, currentTaggedHead.second + 1); - } while (!CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead)); - } - - T* Extract() - { - T* item; - TTaggedPointer currentTaggedHead{}; - TTaggedPointer newTaggedHead{}; - CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead); - do { - item = currentTaggedHead.first; - if (!item) { - break; - } - newTaggedHead = std::make_pair(item->Next, currentTaggedHead.second + 1); - } while (!CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead)); - return item; - } - - T* ExtractAll() - { - T* item; - TTaggedPointer currentTaggedHead{}; - TTaggedPointer newTaggedHead; - do { - item = currentTaggedHead.first; - newTaggedHead = std::make_pair(nullptr, currentTaggedHead.second + 1); - } while (!CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead)); - return item; - } - -private: - using TAtomicUint128 = volatile unsigned __int128 __attribute__((aligned(16))); - using TTag = ui64; - using TTaggedPointer = std::pair<T*, TTag>; - - TAtomicUint128 TaggedHead_ = 0; - - // Avoid false sharing. - char Padding[CacheLineSize - sizeof(TAtomicUint128)]; - -private: - static Y_FORCE_INLINE bool CompareAndSet(TAtomicUint128* atomic, TTaggedPointer& expectedValue, TTaggedPointer newValue) - { - bool success; - __asm__ __volatile__ - ( - "lock cmpxchg16b %1\n" - "setz %0" - : "=q"(success) - , "+m"(*atomic) - , "+a"(expectedValue.first) - , "+d"(expectedValue.second) - : "b"(newValue.first) - , "c"(newValue.second) - : "cc" - ); - return success; - } -}; - static_assert(sizeof(TFreeList<void>) == CacheLineSize, "sizeof(TFreeList) != CacheLineSize"); //////////////////////////////////////////////////////////////////////////////// @@ -1745,7 +1666,7 @@ TExplicitlyConstructableSingleton<TMmapObservationManager> MmapObservationManage // A per-thread structure containing counters, chunk caches etc. struct TThreadState - : public TFreeListItem<TThreadState> + : public TFreeListItemBase<TThreadState> , public TLocalShardedState { // TThreadState instances of all alive threads are put into a double-linked intrusive list. @@ -2915,7 +2836,7 @@ constexpr size_t GroupsBatchSize = 1024; static_assert(ChunksPerGroup <= MaxCachedChunksPerRank, "ChunksPerGroup > MaxCachedChunksPerRank"); class TChunkGroup - : public TFreeListItem<TChunkGroup> + : public TFreeListItemBase<TChunkGroup> { public: bool IsEmpty() const @@ -3447,7 +3368,7 @@ enum ELargeBlobState : ui64 // Every large blob (either tagged or not) is prepended with this header. struct TLargeBlobHeader - : public TFreeListItem<TLargeBlobHeader> + : public TFreeListItemBase<TLargeBlobHeader> { TLargeBlobHeader( TLargeBlobExtent* extent, @@ -3492,7 +3413,7 @@ struct TLargeBlobExtent // A helper node that enables storing a number of extent's segments // in a free list. Recall that segments themselves do not posses any headers. struct TDisposedSegment - : public TFreeListItem<TDisposedSegment> + : public TFreeListItemBase<TDisposedSegment> { size_t Index; TLargeBlobExtent* Extent; @@ -3654,7 +3575,7 @@ private: size_t count = 0; while (blob) { - auto* nextBlob = blob->Next; + auto* nextBlob = blob->Next.load(); YTALLOC_VERIFY(!blob->Locked.load()); AssertBlobState(blob, ELargeBlobState::LockedSpare); blob->State = ELargeBlobState::Spare; @@ -3677,7 +3598,7 @@ private: size_t count = 0; while (blob) { - auto* nextBlob = blob->Next; + auto* nextBlob = blob->Next.load(); AssertBlobState(blob, ELargeBlobState::LockedFreed); MoveBlobToSpare(state, arena, blob, false); ++count; @@ -4926,4 +4847,3 @@ TEnumIndexedVector<ETimingEventType, TTimingEventCounters> GetTimingEventCounter //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NYTAlloc - |