aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraleexfi <aleexfi@yandex-team.com>2023-05-10 13:26:37 +0300
committeraleexfi <aleexfi@yandex-team.com>2023-05-10 13:26:37 +0300
commita5581558250508c2cae6de5225e540e12289a530 (patch)
tree4cb62dc7dd7e67f753acf539d3515bb9c41bba67
parentcc5864c4a61e0fb13950c8ad0c2067874cddcc2f (diff)
downloadydb-a5581558250508c2cae6de5225e540e12289a530.tar.gz
YT-17689: Move TFreeList to library/cpp/yt/memory
Iteration no. 2. First one reverted due to YT-18997
-rw-r--r--contrib/libs/cxxsupp/libcxx/include/latch113
-rw-r--r--library/cpp/yt/memory/free_list-inl.h222
-rw-r--r--library/cpp/yt/memory/free_list.h72
-rw-r--r--library/cpp/yt/memory/public.h3
-rw-r--r--library/cpp/yt/memory/unittests/free_list_ut.cpp158
-rw-r--r--library/cpp/yt/threading/public.h4
-rw-r--r--library/cpp/yt/threading/rw_spin_lock.h2
-rw-r--r--library/cpp/yt/threading/spin_lock.h3
8 files changed, 572 insertions, 5 deletions
diff --git a/contrib/libs/cxxsupp/libcxx/include/latch b/contrib/libs/cxxsupp/libcxx/include/latch
new file mode 100644
index 0000000000..e1e15190ae
--- /dev/null
+++ b/contrib/libs/cxxsupp/libcxx/include/latch
@@ -0,0 +1,113 @@
+// -*- C++ -*-
+//===----------------------------------------------------------------------===//
+//
+// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
+// See https://llvm.org/LICENSE.txt for license information.
+// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
+//
+//===----------------------------------------------------------------------===//
+
+#ifndef _LIBCPP_LATCH
+#define _LIBCPP_LATCH
+
+/*
+ latch synopsis
+
+namespace std
+{
+
+ class latch
+ {
+ public:
+ static constexpr ptrdiff_t max() noexcept;
+
+ constexpr explicit latch(ptrdiff_t __expected);
+ ~latch();
+
+ latch(const latch&) = delete;
+ latch& operator=(const latch&) = delete;
+
+ void count_down(ptrdiff_t __update = 1);
+ bool try_wait() const noexcept;
+ void wait() const;
+ void arrive_and_wait(ptrdiff_t __update = 1);
+
+ private:
+ ptrdiff_t __counter; // exposition only
+ };
+
+}
+
+*/
+
+#include <__availability>
+#include <__config>
+#include <atomic>
+#include <version>
+
+#if !defined(_LIBCPP_HAS_NO_PRAGMA_SYSTEM_HEADER)
+# pragma GCC system_header
+#endif
+
+#ifdef _LIBCPP_HAS_NO_THREADS
+# error <latch> is not supported on this single threaded system
+#endif
+
+_LIBCPP_PUSH_MACROS
+#include <__undef_macros>
+
+#if _LIBCPP_STD_VER >= 14
+
+_LIBCPP_BEGIN_NAMESPACE_STD
+
+class latch
+{
+ __atomic_base<ptrdiff_t> __a;
+
+public:
+ static constexpr ptrdiff_t max() noexcept {
+ return numeric_limits<ptrdiff_t>::max();
+ }
+
+ inline _LIBCPP_INLINE_VISIBILITY
+ constexpr explicit latch(ptrdiff_t __expected) : __a(__expected) { }
+
+ ~latch() = default;
+ latch(const latch&) = delete;
+ latch& operator=(const latch&) = delete;
+
+ inline _LIBCPP_AVAILABILITY_SYNC _LIBCPP_INLINE_VISIBILITY
+ void count_down(ptrdiff_t __update = 1)
+ {
+ auto const __old = __a.fetch_sub(__update, memory_order_release);
+ if(__old == __update)
+ __a.notify_all();
+ }
+ inline _LIBCPP_INLINE_VISIBILITY
+ bool try_wait() const noexcept
+ {
+ return 0 == __a.load(memory_order_acquire);
+ }
+ inline _LIBCPP_AVAILABILITY_SYNC _LIBCPP_INLINE_VISIBILITY
+ void wait() const
+ {
+ auto const __test_fn = [=]() -> bool {
+ return try_wait();
+ };
+ __cxx_atomic_wait(&__a.__a_, __test_fn);
+ }
+ inline _LIBCPP_AVAILABILITY_SYNC _LIBCPP_INLINE_VISIBILITY
+ void arrive_and_wait(ptrdiff_t __update = 1)
+ {
+ count_down(__update);
+ wait();
+ }
+};
+
+_LIBCPP_END_NAMESPACE_STD
+
+#endif // _LIBCPP_STD_VER >= 14
+
+_LIBCPP_POP_MACROS
+
+#endif //_LIBCPP_LATCH
diff --git a/library/cpp/yt/memory/free_list-inl.h b/library/cpp/yt/memory/free_list-inl.h
new file mode 100644
index 0000000000..d428b82498
--- /dev/null
+++ b/library/cpp/yt/memory/free_list-inl.h
@@ -0,0 +1,222 @@
+#ifndef FREE_LIST_INL_H_
+#error "Direct inclusion of this file is not allowed, include free_list.h"
+// For the sake of sane code completion.
+#include "free_list.h"
+#endif
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+// DCAS is supported in Clang with option -mcx16, is not supported in GCC. See following links.
+// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=84522
+// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=80878
+
+template <class T1, class T2>
+Y_FORCE_INLINE bool CompareAndSet(
+ TAtomicUint128* atomic,
+ T1& expected1,
+ T2& expected2,
+ T1 new1,
+ T2 new2)
+{
+#if defined(__x86_64__)
+ bool success;
+ __asm__ __volatile__
+ (
+ "lock cmpxchg16b %1\n"
+ "setz %0"
+ : "=q"(success)
+ , "+m"(*atomic)
+ , "+a"(expected1)
+ , "+d"(expected2)
+ : "b"(new1)
+ , "c"(new2)
+ : "cc"
+ );
+ return success;
+#elif defined(__arm64__) || (defined(__aarch64__) && defined(RTE_ARM_FEATURE_ATOMICS))
+ register ui64 x0 __asm("x0") = (ui64)expected1;
+ register ui64 x1 __asm("x1") = (ui64)expected2;
+ register ui64 x2 __asm("x2") = (ui64)new1;
+ register ui64 x3 __asm("x3") = (ui64)new2;
+ ui64 old1 = (ui64)expected1;
+ ui64 old2 = (ui64)expected2;
+ asm volatile
+ (
+#if defined(RTE_CC_CLANG)
+ ".arch armv8-a+lse\n"
+#endif
+ "caspal %[old0], %[old1], %[upd0], %[upd1], [%[dst]]"
+ : [old0] "+r" (x0)
+ , [old1] "+r" (x1)
+ : [upd0] "r" (x2)
+ , [upd1] "r" (x3)
+ , [dst] "r" (atomic)
+ : "memory"
+ );
+ expected1 = (T1)x0;
+ expected2 = (T2)x1;
+ return x0 == old1 && x1 == old2;
+#elif defined(__aarch64__)
+ ui64 exp1 = reinterpret_cast<ui64>(expected1);
+ ui64 exp2 = reinterpret_cast<ui64>(expected2);
+ ui32 fail = 0;
+
+ do {
+ ui64 current1 = 0;
+ ui64 current2 = 0;
+ asm volatile (
+ "ldaxp %[cur1], %[cur2], [%[src]]"
+ : [cur1] "=r" (current1)
+ , [cur2] "=r" (current2)
+ : [src] "r" (atomic)
+ : "memory"
+ );
+
+ if (current1 != exp1 || current2 != exp2) {
+ expected1 = reinterpret_cast<T1>(current1);
+ expected2 = reinterpret_cast<T2>(current2);
+ return false;
+ }
+
+ asm volatile (
+ "stlxp %w[fail], %[new1], %[new2], [%[dst]]"
+ : [fail] "=&r" (fail)
+ : [new1] "r" (new1)
+ , [new2] "r" (new2)
+ , [dst] "r" (atomic)
+ : "memory"
+ );
+
+ } while (Y_UNLIKELY(fail));
+ return true;
+#else
+# error Unsupported platform
+#endif
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TItem>
+TFreeList<TItem>::THead::THead(TItem* pointer)
+ : Pointer(pointer)
+{ }
+
+template <class TItem>
+TFreeList<TItem>::TFreeList()
+ : Head_()
+{ }
+
+template <class TItem>
+TFreeList<TItem>::TFreeList(TFreeList<TItem>&& other)
+ : Head_(other.ExtractAll())
+{ }
+
+template <class TItem>
+TFreeList<TItem>::~TFreeList()
+{
+ YT_VERIFY(IsEmpty());
+}
+
+template <class TItem>
+template <class TPredicate>
+Y_NO_SANITIZE("thread")
+bool TFreeList<TItem>::PutIf(TItem* head, TItem* tail, TPredicate predicate)
+{
+ auto* current = Head_.Pointer.load(std::memory_order::relaxed);
+ auto epoch = Head_.Epoch.load(std::memory_order::relaxed);
+
+ while (predicate(current)) {
+ tail->Next.store(current, std::memory_order::release);
+ if (CompareAndSet(&AtomicHead_, current, epoch, head, epoch + 1)) {
+ return true;
+ }
+ }
+
+ tail->Next.store(nullptr, std::memory_order::release);
+
+ return false;
+}
+
+
+template <class TItem>
+Y_NO_SANITIZE("thread")
+void TFreeList<TItem>::Put(TItem* head, TItem* tail)
+{
+ auto* current = Head_.Pointer.load(std::memory_order::relaxed);
+ auto epoch = Head_.Epoch.load(std::memory_order::relaxed);
+
+ do {
+ tail->Next.store(current, std::memory_order::release);
+ } while (!CompareAndSet(&AtomicHead_, current, epoch, head, epoch + 1));
+}
+
+template <class TItem>
+void TFreeList<TItem>::Put(TItem* item)
+{
+ Put(item, item);
+}
+
+template <class TItem>
+Y_NO_SANITIZE("thread")
+TItem* TFreeList<TItem>::Extract()
+{
+ auto* current = Head_.Pointer.load(std::memory_order::relaxed);
+ auto epoch = Head_.Epoch.load(std::memory_order::relaxed);
+
+ while (current) {
+ // If current node is already extracted by other thread
+ // there can be any writes at address &current->Next.
+ // The only guaranteed thing is that address is valid (memory is not freed).
+ auto next = current->Next.load(std::memory_order::acquire);
+ if (CompareAndSet(&AtomicHead_, current, epoch, next, epoch + 1)) {
+ current->Next.store(nullptr, std::memory_order::release);
+ return current;
+ }
+ }
+
+ return nullptr;
+}
+
+template <class TItem>
+TItem* TFreeList<TItem>::ExtractAll()
+{
+ auto* current = Head_.Pointer.load(std::memory_order::relaxed);
+ auto epoch = Head_.Epoch.load(std::memory_order::relaxed);
+
+ while (current) {
+ if (CompareAndSet<TItem*, size_t>(&AtomicHead_, current, epoch, nullptr, epoch + 1)) {
+ return current;
+ }
+ }
+
+ return nullptr;
+}
+
+template <class TItem>
+bool TFreeList<TItem>::IsEmpty() const
+{
+ return Head_.Pointer.load() == nullptr;
+}
+
+template <class TItem>
+void TFreeList<TItem>::Append(TFreeList<TItem>& other)
+{
+ auto* head = other.ExtractAll();
+
+ if (!head) {
+ return;
+ }
+
+ auto* tail = head;
+ while (tail->Next) {
+ tail = tail->Next;
+ }
+
+ Put(head, tail);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/memory/free_list.h b/library/cpp/yt/memory/free_list.h
new file mode 100644
index 0000000000..8c06bcb7a5
--- /dev/null
+++ b/library/cpp/yt/memory/free_list.h
@@ -0,0 +1,72 @@
+#pragma once
+
+#include "public.h"
+
+#include <atomic>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+struct TFreeListItemBase
+{
+ std::atomic<T*> Next = nullptr;
+};
+
+using TAtomicUint128 = volatile unsigned __int128 __attribute__((aligned(16)));
+
+template <class TItem>
+class TFreeList
+{
+private:
+ struct THead
+ {
+ std::atomic<TItem*> Pointer = nullptr;
+ std::atomic<size_t> Epoch = 0;
+
+ THead() = default;
+
+ explicit THead(TItem* pointer);
+
+ };
+
+ union
+ {
+ THead Head_;
+ TAtomicUint128 AtomicHead_;
+ };
+
+ // Avoid false sharing.
+ char Padding[CacheLineSize - sizeof(TAtomicUint128)];
+
+public:
+ TFreeList();
+
+ TFreeList(TFreeList&& other);
+
+ ~TFreeList();
+
+ template <class TPredicate>
+ bool PutIf(TItem* head, TItem* tail, TPredicate predicate);
+
+ void Put(TItem* head, TItem* tail);
+
+ void Put(TItem* item);
+
+ TItem* Extract();
+
+ TItem* ExtractAll();
+
+ bool IsEmpty() const;
+
+ void Append(TFreeList& other);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define FREE_LIST_INL_H_
+#include "free_list-inl.h"
+#undef FREE_LIST_INL_H_
diff --git a/library/cpp/yt/memory/public.h b/library/cpp/yt/memory/public.h
index fb1546dd59..f05a6b4569 100644
--- a/library/cpp/yt/memory/public.h
+++ b/library/cpp/yt/memory/public.h
@@ -6,6 +6,9 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
+// TODO(babenko): consider increasing to 128 due to cache line pairing in L2 prefetcher.
+constexpr size_t CacheLineSize = 64;
+
class TChunkedMemoryPool;
DECLARE_REFCOUNTED_STRUCT(IMemoryChunkProvider)
diff --git a/library/cpp/yt/memory/unittests/free_list_ut.cpp b/library/cpp/yt/memory/unittests/free_list_ut.cpp
new file mode 100644
index 0000000000..3d2ecf103e
--- /dev/null
+++ b/library/cpp/yt/memory/unittests/free_list_ut.cpp
@@ -0,0 +1,158 @@
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <library/cpp/yt/memory/free_list.h>
+
+#include <util/random/random.h>
+
+#include <thread>
+#include <stack>
+#include <latch>
+
+namespace NYT {
+namespace {
+
+using namespace std::chrono_literals;
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TFreeListTest, CompareAndSet)
+{
+ TAtomicUint128 v = 0;
+ ui64 p1 = 0;
+ ui64 p2 = 0;
+ EXPECT_TRUE(CompareAndSet(&v, p1, p2, ui64{13}, ui64{9}));
+ EXPECT_FALSE(CompareAndSet(&v, p1, p2, ui64{100}, ui64{500}));
+ EXPECT_EQ(13u, p1);
+ EXPECT_EQ(9u, p2);
+ EXPECT_TRUE(CompareAndSet(&v, p1, p2, ui64{100}, ui64{500}));
+ EXPECT_EQ(TAtomicUint128{500} << 64 | 100, v);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TTestConfig
+{
+ ui64 Threads;
+ ui64 MaxBatchSize;
+ std::chrono::seconds TimeLimit;
+};
+
+class TFreeListStressTest
+ : public testing::TestWithParam<TTestConfig>
+{ };
+
+struct TTestItem
+ : public NYT::TFreeListItemBase<TTestItem>
+{
+ TTestItem() = default;
+
+ ui64 Value = 0;
+ ui64 IndexInSet = 0;
+ // Avoid false sharing.
+ char Padding[CacheLineSize - 2 * sizeof(ui64)];
+};
+
+class TTestItemSet
+{
+public:
+ static void Reset()
+ {
+ Items_.clear();
+ }
+
+ static TTestItemSet Allocate(size_t setSize)
+ {
+ TTestItemSet set;
+ for (size_t i = 0; i < setSize; ++i) {
+ Items_.push_back(std::make_unique<TTestItem>());
+ Items_.back()->IndexInSet = Items_.size() - 1;
+ set.Acquire(Items_.back().get());
+ }
+ return set;
+ }
+
+ void Acquire(TTestItem* item)
+ {
+ AcquiredItemIndices_.push(item->IndexInSet);
+ }
+
+ TTestItem* Release()
+ {
+ YT_VERIFY(!AcquiredItemIndices_.empty());
+ size_t index = AcquiredItemIndices_.top();
+ AcquiredItemIndices_.pop();
+ return Items_[index].get();
+ }
+
+private:
+ inline static std::vector<std::unique_ptr<TTestItem>> Items_;
+
+ std::stack<size_t> AcquiredItemIndices_;
+};
+
+
+TEST_P(TFreeListStressTest, Stress)
+{
+ TTestItemSet::Reset();
+ SetRandomSeed(0x424242);
+ auto params = GetParam();
+
+ TFreeList<TTestItem> list;
+
+ std::latch start(params.Threads);
+
+ std::atomic<bool> running{true};
+ std::atomic<ui64> put{0};
+ std::atomic<ui64> extracted{0};
+
+ std::vector<std::thread> workers;
+ for (ui64 i = 0; i < params.Threads; ++i) {
+ auto itemSet = TTestItemSet::Allocate(params.MaxBatchSize);
+ workers.emplace_back([&, params, itemSet = std::move(itemSet)]() mutable {
+ start.arrive_and_wait();
+
+ while (running.load(std::memory_order::relaxed)) {
+ // Push batch of items.
+ ui64 batchSize = 1 + RandomNumber<ui64>(params.MaxBatchSize);
+ for (ui64 i = 0; i < batchSize; ++i) {
+ auto* item = itemSet.Release();
+ item->Value = 1 + RandomNumber<ui64>(1e9);
+ put.fetch_add(item->Value, std::memory_order::relaxed);
+ list.Put(item);
+ }
+
+ // Pop batch of items.
+ for (ui64 i = 0; i < batchSize; ++i) {
+ auto* item = list.Extract();
+ ASSERT_NE(item, nullptr);
+ extracted.fetch_add(item->Value, std::memory_order::relaxed);
+ itemSet.Acquire(item);
+ }
+ }
+ });
+ }
+
+ Sleep(params.TimeLimit);
+ running.store(false);
+
+ for (auto& worker : workers) {
+ worker.join();
+ }
+
+ Cerr << "Put: " << put.load() << Endl;
+ Cerr << "Extracted: " << extracted.load() << Endl;
+ EXPECT_EQ(put.load(), extracted.load());
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ TFreeListTest,
+ TFreeListStressTest,
+ testing::Values(
+ TTestConfig{4, 1, 15s},
+ TTestConfig{4, 3, 15s},
+ TTestConfig{4, 5, 15s}));
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT
diff --git a/library/cpp/yt/threading/public.h b/library/cpp/yt/threading/public.h
index 92d062ac3e..ab6cb22663 100644
--- a/library/cpp/yt/threading/public.h
+++ b/library/cpp/yt/threading/public.h
@@ -6,13 +6,9 @@ namespace NYT::NThreading {
////////////////////////////////////////////////////////////////////////////////
-// TODO(babenko): consider increasing to 128 due to cache line pairing in L2 prefetcher.
-constexpr size_t CacheLineSize = 64;
-
#define YT_DECLARE_SPIN_LOCK(type, name) \
type name{__LOCATION__}
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NThreading
-
diff --git a/library/cpp/yt/threading/rw_spin_lock.h b/library/cpp/yt/threading/rw_spin_lock.h
index 3e834dfcc9..d1235dfc4a 100644
--- a/library/cpp/yt/threading/rw_spin_lock.h
+++ b/library/cpp/yt/threading/rw_spin_lock.h
@@ -3,6 +3,8 @@
#include "public.h"
#include "spin_lock_base.h"
+#include <library/cpp/yt/memory/public.h>
+
#include <util/system/rwlock.h>
#include <atomic>
diff --git a/library/cpp/yt/threading/spin_lock.h b/library/cpp/yt/threading/spin_lock.h
index 28103af1ba..d0d5c3cdbf 100644
--- a/library/cpp/yt/threading/spin_lock.h
+++ b/library/cpp/yt/threading/spin_lock.h
@@ -7,6 +7,8 @@
#include <library/cpp/yt/system/thread_id.h>
+#include <library/cpp/yt/memory/public.h>
+
#include <util/system/src_location.h>
#include <util/system/types.h>
@@ -78,4 +80,3 @@ private:
#define SPIN_LOCK_INL_H_
#include "spin_lock-inl.h"
#undef SPIN_LOCK_INL_H_
-