aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraleexfi <aleexfi@yandex-team.com>2023-03-17 18:24:11 +0300
committeraleexfi <aleexfi@yandex-team.com>2023-03-17 18:24:11 +0300
commit7825c9057d3fad670eadd60509802152127d6e49 (patch)
tree465946e4c2efd91fda1b5806ec3c7fe760d3f6ca
parent9041b256167b6c37a6d99acdbf5f606f7f47bb02 (diff)
downloadydb-7825c9057d3fad670eadd60509802152127d6e49.tar.gz
YT-17689: Move TFreeList to library/cpp/yt/memory
-rw-r--r--library/cpp/yt/memory/free_list-inl.h222
-rw-r--r--library/cpp/yt/memory/free_list.h78
-rw-r--r--library/cpp/yt/memory/public.h3
-rw-r--r--library/cpp/yt/memory/unittests/free_list_ut.cpp26
-rw-r--r--library/cpp/yt/memory/unittests/ya.make1
-rw-r--r--library/cpp/yt/threading/public.h4
-rw-r--r--library/cpp/yt/threading/rw_spin_lock.h3
-rw-r--r--library/cpp/yt/threading/spin_lock.h3
-rw-r--r--library/cpp/ytalloc/impl/core-inl.h96
9 files changed, 342 insertions, 94 deletions
diff --git a/library/cpp/yt/memory/free_list-inl.h b/library/cpp/yt/memory/free_list-inl.h
new file mode 100644
index 0000000000..45bb2b8f10
--- /dev/null
+++ b/library/cpp/yt/memory/free_list-inl.h
@@ -0,0 +1,222 @@
+#ifndef FREE_LIST_INL_H_
+#error "Direct inclusion of this file is not allowed, include free_list.h"
+// For the sake of sane code completion.
+#include "free_list.h"
+#endif
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace NDetail {
+
+template <class T1, class T2>
+Y_FORCE_INLINE bool CompareAndSet(
+ TAtomicUint128* atomic,
+ T1& expected1,
+ T2& expected2,
+ T1 new1,
+ T2 new2)
+{
+#if defined(__x86_64__)
+ bool success;
+ __asm__ __volatile__
+ (
+ "lock cmpxchg16b %1\n"
+ "setz %0"
+ : "=q"(success)
+ , "+m"(*atomic)
+ , "+a"(expected1)
+ , "+d"(expected2)
+ : "b"(new1)
+ , "c"(new2)
+ : "cc"
+ );
+ return success;
+#elif defined(__arm64__) || (defined(__aarch64__) && defined(RTE_ARM_FEATURE_ATOMICS))
+ register ui64 x0 __asm("x0") = (ui64)expected1;
+ register ui64 x1 __asm("x1") = (ui64)expected2;
+ register ui64 x2 __asm("x2") = (ui64)new1;
+ register ui64 x3 __asm("x3") = (ui64)new2;
+ ui64 old1 = (ui64)expected1;
+ ui64 old2 = (ui64)expected2;
+ asm volatile
+ (
+#if defined(RTE_CC_CLANG)
+ ".arch armv8-a+lse\n"
+#endif
+ "caspal %[old0], %[old1], %[upd0], %[upd1], [%[dst]]"
+ : [old0] "+r" (x0)
+ , [old1] "+r" (x1)
+ : [upd0] "r" (x2)
+ , [upd1] "r" (x3)
+ , [dst] "r" (atomic)
+ : "memory"
+ );
+ expected1 = (T1)x0;
+ expected2 = (T2)x1;
+ return x0 == old1 && x1 == old2;
+#elif defined(__aarch64__)
+ ui64 exp1 = reinterpret_cast<ui64>(expected1);
+ ui64 exp2 = reinterpret_cast<ui64>(expected2);
+ ui32 fail = 0;
+
+ do {
+ ui64 current1 = 0;
+ ui64 current2 = 0;
+ asm volatile (
+ "ldaxp %[cur1], %[cur2], [%[src]]"
+ : [cur1] "=r" (current1)
+ , [cur2] "=r" (current2)
+ : [src] "r" (atomic)
+ : "memory"
+ );
+
+ if (current1 != exp1 || current2 != exp2) {
+ expected1 = reinterpret_cast<T1>(current1);
+ expected2 = reinterpret_cast<T2>(current2);
+ return false;
+ }
+
+ asm volatile (
+ "stlxp %w[fail], %[new1], %[new2], [%[dst]]"
+ : [fail] "=&r" (fail)
+ : [new1] "r" (new1)
+ , [new2] "r" (new2)
+ , [dst] "r" (atomic)
+ : "memory"
+ );
+
+ } while (Y_UNLIKELY(fail));
+ return true;
+#else
+# error Unsupported platform
+#endif
+}
+
+} // namespace NDetail
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TItem>
+TFreeList<TItem>::THead::THead(TItem* pointer)
+ : Pointer(pointer)
+{ }
+
+template <class TItem>
+TFreeList<TItem>::TFreeList()
+ : Head_()
+{ }
+
+template <class TItem>
+TFreeList<TItem>::TFreeList(TFreeList<TItem>&& other)
+ : Head_(other.ExtractAll())
+{ }
+
+template <class TItem>
+TFreeList<TItem>::~TFreeList()
+{
+ YT_VERIFY(IsEmpty());
+}
+
+template <class TItem>
+template <class TPredicate>
+Y_NO_SANITIZE("thread")
+bool TFreeList<TItem>::PutIf(TItem* head, TItem* tail, TPredicate predicate)
+{
+ auto* current = Head_.Pointer.load(std::memory_order::relaxed);
+ auto popCount = Head_.PopCount.load(std::memory_order::relaxed);
+
+ while (predicate(current)) {
+ tail->Next.store(current, std::memory_order::release);
+ if (NYT::NDetail::CompareAndSet(&AtomicHead_, current, popCount, head, popCount)) {
+ return true;
+ }
+ }
+
+ tail->Next.store(nullptr, std::memory_order::release);
+
+ return false;
+}
+
+
+template <class TItem>
+Y_NO_SANITIZE("thread")
+void TFreeList<TItem>::Put(TItem* head, TItem* tail)
+{
+ auto* current = Head_.Pointer.load(std::memory_order::relaxed);
+ auto popCount = Head_.PopCount.load(std::memory_order::relaxed);
+
+ do {
+ tail->Next.store(current, std::memory_order::release);
+ } while (!NYT::NDetail::CompareAndSet(&AtomicHead_, current, popCount, head, popCount));
+}
+
+template <class TItem>
+void TFreeList<TItem>::Put(TItem* item)
+{
+ Put(item, item);
+}
+
+template <class TItem>
+Y_NO_SANITIZE("thread")
+TItem* TFreeList<TItem>::Extract()
+{
+ auto* current = Head_.Pointer.load(std::memory_order::relaxed);
+ auto popCount = Head_.PopCount.load(std::memory_order::relaxed);
+
+ while (current) {
+ // If current node is already extracted by other thread
+ // there can be any writes at address &current->Next.
+ // The only guaranteed thing is that address is valid (memory is not freed).
+ auto next = current->Next.load(std::memory_order::acquire);
+ if (NYT::NDetail::CompareAndSet(&AtomicHead_, current, popCount, next, popCount + 1)) {
+ current->Next.store(nullptr, std::memory_order::release);
+ return current;
+ }
+ }
+
+ return nullptr;
+}
+
+template <class TItem>
+TItem* TFreeList<TItem>::ExtractAll()
+{
+ auto* current = Head_.Pointer.load(std::memory_order::relaxed);
+ auto popCount = Head_.PopCount.load(std::memory_order::relaxed);
+
+ while (current) {
+ if (NYT::NDetail::CompareAndSet<TItem*, size_t>(&AtomicHead_, current, popCount, nullptr, popCount + 1)) {
+ return current;
+ }
+ }
+
+ return nullptr;
+}
+
+template <class TItem>
+bool TFreeList<TItem>::IsEmpty() const
+{
+ return Head_.Pointer.load() == nullptr;
+}
+
+template <class TItem>
+void TFreeList<TItem>::Append(TFreeList<TItem>& other)
+{
+ auto* head = other.ExtractAll();
+
+ if (!head) {
+ return;
+ }
+
+ auto* tail = head;
+ while (tail->Next) {
+ tail = tail->Next;
+ }
+
+ Put(head, tail);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/memory/free_list.h b/library/cpp/yt/memory/free_list.h
new file mode 100644
index 0000000000..670b7d2cc4
--- /dev/null
+++ b/library/cpp/yt/memory/free_list.h
@@ -0,0 +1,78 @@
+#pragma once
+
+#include "public.h"
+
+#include <util/system/compiler.h>
+
+#include <atomic>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+struct TFreeListItemBase
+{
+ std::atomic<T*> Next = nullptr;
+};
+
+// DCAS is supported in Clang with option -mcx16, is not supported in GCC. See following links.
+// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=84522
+// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=80878
+
+using TAtomicUint128 = volatile unsigned __int128 __attribute__((aligned(16)));
+
+template <class TItem>
+class TFreeList
+{
+private:
+ struct THead
+ {
+ std::atomic<TItem*> Pointer = {nullptr};
+ std::atomic<size_t> PopCount = 0;
+
+ THead() = default;
+
+ explicit THead(TItem* pointer);
+
+ };
+
+ union
+ {
+ THead Head_;
+ TAtomicUint128 AtomicHead_;
+ };
+
+ // Avoid false sharing.
+ char Padding[CacheLineSize - sizeof(TAtomicUint128)];
+
+public:
+ TFreeList();
+
+ TFreeList(TFreeList&& other);
+
+ ~TFreeList();
+
+ template <class TPredicate>
+ bool PutIf(TItem* head, TItem* tail, TPredicate predicate);
+
+ void Put(TItem* head, TItem* tail);
+
+ void Put(TItem* item);
+
+ TItem* Extract();
+
+ TItem* ExtractAll();
+
+ bool IsEmpty() const;
+
+ void Append(TFreeList& other);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define FREE_LIST_INL_H_
+#include "free_list-inl.h"
+#undef FREE_LIST_INL_H_
diff --git a/library/cpp/yt/memory/public.h b/library/cpp/yt/memory/public.h
index fb1546dd59..f05a6b4569 100644
--- a/library/cpp/yt/memory/public.h
+++ b/library/cpp/yt/memory/public.h
@@ -6,6 +6,9 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
+// TODO(babenko): consider increasing to 128 due to cache line pairing in L2 prefetcher.
+constexpr size_t CacheLineSize = 64;
+
class TChunkedMemoryPool;
DECLARE_REFCOUNTED_STRUCT(IMemoryChunkProvider)
diff --git a/library/cpp/yt/memory/unittests/free_list_ut.cpp b/library/cpp/yt/memory/unittests/free_list_ut.cpp
new file mode 100644
index 0000000000..307a5abb7c
--- /dev/null
+++ b/library/cpp/yt/memory/unittests/free_list_ut.cpp
@@ -0,0 +1,26 @@
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <library/cpp/yt/memory/free_list.h>
+
+namespace NYT {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TFreeListTest, CompareAndSet)
+{
+ TAtomicUint128 v = 0;
+ ui64 p1 = 0;
+ ui64 p2 = 0;
+ EXPECT_TRUE(NYT::NDetail::CompareAndSet(&v, p1, p2, ui64{13}, ui64{9}));
+ EXPECT_FALSE(NYT::NDetail::CompareAndSet(&v, p1, p2, ui64{100}, ui64{500}));
+ EXPECT_EQ(13u, p1);
+ EXPECT_EQ(9u, p2);
+ EXPECT_TRUE(NYT::NDetail::CompareAndSet(&v, p1, p2, ui64{100}, ui64{500}));
+ EXPECT_EQ(TAtomicUint128{500} << 64 | 100, v);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT
diff --git a/library/cpp/yt/memory/unittests/ya.make b/library/cpp/yt/memory/unittests/ya.make
index ff0e639819..684fd1ad26 100644
--- a/library/cpp/yt/memory/unittests/ya.make
+++ b/library/cpp/yt/memory/unittests/ya.make
@@ -10,6 +10,7 @@ SRCS(
atomic_intrusive_ptr_ut.cpp
chunked_memory_pool_ut.cpp
chunked_memory_pool_output_ut.cpp
+ free_list_ut.cpp
intrusive_ptr_ut.cpp
weak_ptr_ut.cpp
)
diff --git a/library/cpp/yt/threading/public.h b/library/cpp/yt/threading/public.h
index 92d062ac3e..ab6cb22663 100644
--- a/library/cpp/yt/threading/public.h
+++ b/library/cpp/yt/threading/public.h
@@ -6,13 +6,9 @@ namespace NYT::NThreading {
////////////////////////////////////////////////////////////////////////////////
-// TODO(babenko): consider increasing to 128 due to cache line pairing in L2 prefetcher.
-constexpr size_t CacheLineSize = 64;
-
#define YT_DECLARE_SPIN_LOCK(type, name) \
type name{__LOCATION__}
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NThreading
-
diff --git a/library/cpp/yt/threading/rw_spin_lock.h b/library/cpp/yt/threading/rw_spin_lock.h
index 3e834dfcc9..5dfefac686 100644
--- a/library/cpp/yt/threading/rw_spin_lock.h
+++ b/library/cpp/yt/threading/rw_spin_lock.h
@@ -3,6 +3,8 @@
#include "public.h"
#include "spin_lock_base.h"
+#include <library/cpp/yt/memory/public.h>
+
#include <util/system/rwlock.h>
#include <atomic>
@@ -164,4 +166,3 @@ auto WriterGuard(const T* lock);
#define RW_SPIN_LOCK_INL_H_
#include "rw_spin_lock-inl.h"
#undef RW_SPIN_LOCK_INL_H_
-
diff --git a/library/cpp/yt/threading/spin_lock.h b/library/cpp/yt/threading/spin_lock.h
index 28103af1ba..dc3d7a6a59 100644
--- a/library/cpp/yt/threading/spin_lock.h
+++ b/library/cpp/yt/threading/spin_lock.h
@@ -3,6 +3,8 @@
#include "public.h"
#include "spin_lock_base.h"
+#include <library/cpp/yt/memory/public.h>
+
#include <library/cpp/yt/misc/port.h>
#include <library/cpp/yt/system/thread_id.h>
@@ -78,4 +80,3 @@ private:
#define SPIN_LOCK_INL_H_
#include "spin_lock-inl.h"
#undef SPIN_LOCK_INL_H_
-
diff --git a/library/cpp/ytalloc/impl/core-inl.h b/library/cpp/ytalloc/impl/core-inl.h
index e8e5d25442..493a9181c9 100644
--- a/library/cpp/ytalloc/impl/core-inl.h
+++ b/library/cpp/ytalloc/impl/core-inl.h
@@ -7,6 +7,8 @@
#include <library/cpp/yt/containers/intrusive_linked_list.h>
+#include <library/cpp/yt/memory/free_list.h>
+
#include <library/cpp/yt/threading/at_fork.h>
#include <library/cpp/yt/threading/fork_aware_spin_lock.h>
@@ -503,89 +505,8 @@ Y_FORCE_INLINE void PoisonUninitializedRange(void* ptr, size_t size)
////////////////////////////////////////////////////////////////////////////////
-template <class T>
-struct TFreeListItem
-{
- T* Next = nullptr;
-};
-
constexpr size_t CacheLineSize = 64;
-// A lock-free stack of items (derived from TFreeListItem).
-// Supports multiple producers and multiple consumers.
-// Internally uses DCAS with tagged pointers to defeat ABA.
-template <class T>
-class TFreeList
-{
-public:
- void Put(T* item)
- {
- TTaggedPointer currentTaggedHead{};
- TTaggedPointer newTaggedHead;
- do {
- item->Next = currentTaggedHead.first;
- newTaggedHead = std::make_pair(item, currentTaggedHead.second + 1);
- } while (!CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead));
- }
-
- T* Extract()
- {
- T* item;
- TTaggedPointer currentTaggedHead{};
- TTaggedPointer newTaggedHead{};
- CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead);
- do {
- item = currentTaggedHead.first;
- if (!item) {
- break;
- }
- newTaggedHead = std::make_pair(item->Next, currentTaggedHead.second + 1);
- } while (!CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead));
- return item;
- }
-
- T* ExtractAll()
- {
- T* item;
- TTaggedPointer currentTaggedHead{};
- TTaggedPointer newTaggedHead;
- do {
- item = currentTaggedHead.first;
- newTaggedHead = std::make_pair(nullptr, currentTaggedHead.second + 1);
- } while (!CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead));
- return item;
- }
-
-private:
- using TAtomicUint128 = volatile unsigned __int128 __attribute__((aligned(16)));
- using TTag = ui64;
- using TTaggedPointer = std::pair<T*, TTag>;
-
- TAtomicUint128 TaggedHead_ = 0;
-
- // Avoid false sharing.
- char Padding[CacheLineSize - sizeof(TAtomicUint128)];
-
-private:
- static Y_FORCE_INLINE bool CompareAndSet(TAtomicUint128* atomic, TTaggedPointer& expectedValue, TTaggedPointer newValue)
- {
- bool success;
- __asm__ __volatile__
- (
- "lock cmpxchg16b %1\n"
- "setz %0"
- : "=q"(success)
- , "+m"(*atomic)
- , "+a"(expectedValue.first)
- , "+d"(expectedValue.second)
- : "b"(newValue.first)
- , "c"(newValue.second)
- : "cc"
- );
- return success;
- }
-};
-
static_assert(sizeof(TFreeList<void>) == CacheLineSize, "sizeof(TFreeList) != CacheLineSize");
////////////////////////////////////////////////////////////////////////////////
@@ -1745,7 +1666,7 @@ TExplicitlyConstructableSingleton<TMmapObservationManager> MmapObservationManage
// A per-thread structure containing counters, chunk caches etc.
struct TThreadState
- : public TFreeListItem<TThreadState>
+ : public TFreeListItemBase<TThreadState>
, public TLocalShardedState
{
// TThreadState instances of all alive threads are put into a double-linked intrusive list.
@@ -2915,7 +2836,7 @@ constexpr size_t GroupsBatchSize = 1024;
static_assert(ChunksPerGroup <= MaxCachedChunksPerRank, "ChunksPerGroup > MaxCachedChunksPerRank");
class TChunkGroup
- : public TFreeListItem<TChunkGroup>
+ : public TFreeListItemBase<TChunkGroup>
{
public:
bool IsEmpty() const
@@ -3447,7 +3368,7 @@ enum ELargeBlobState : ui64
// Every large blob (either tagged or not) is prepended with this header.
struct TLargeBlobHeader
- : public TFreeListItem<TLargeBlobHeader>
+ : public TFreeListItemBase<TLargeBlobHeader>
{
TLargeBlobHeader(
TLargeBlobExtent* extent,
@@ -3492,7 +3413,7 @@ struct TLargeBlobExtent
// A helper node that enables storing a number of extent's segments
// in a free list. Recall that segments themselves do not posses any headers.
struct TDisposedSegment
- : public TFreeListItem<TDisposedSegment>
+ : public TFreeListItemBase<TDisposedSegment>
{
size_t Index;
TLargeBlobExtent* Extent;
@@ -3654,7 +3575,7 @@ private:
size_t count = 0;
while (blob) {
- auto* nextBlob = blob->Next;
+ auto* nextBlob = blob->Next.load();
YTALLOC_VERIFY(!blob->Locked.load());
AssertBlobState(blob, ELargeBlobState::LockedSpare);
blob->State = ELargeBlobState::Spare;
@@ -3677,7 +3598,7 @@ private:
size_t count = 0;
while (blob) {
- auto* nextBlob = blob->Next;
+ auto* nextBlob = blob->Next.load();
AssertBlobState(blob, ELargeBlobState::LockedFreed);
MoveBlobToSpare(state, arena, blob, false);
++count;
@@ -4926,4 +4847,3 @@ TEnumIndexedVector<ETimingEventType, TTimingEventCounters> GetTimingEventCounter
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NYTAlloc
-