diff options
author | aleexfi <aleexfi@yandex-team.com> | 2023-05-10 13:26:37 +0300 |
---|---|---|
committer | aleexfi <aleexfi@yandex-team.com> | 2023-05-10 13:26:37 +0300 |
commit | a5581558250508c2cae6de5225e540e12289a530 (patch) | |
tree | 4cb62dc7dd7e67f753acf539d3515bb9c41bba67 | |
parent | cc5864c4a61e0fb13950c8ad0c2067874cddcc2f (diff) | |
download | ydb-a5581558250508c2cae6de5225e540e12289a530.tar.gz |
YT-17689: Move TFreeList to library/cpp/yt/memory
Iteration no. 2. First one reverted due to YT-18997
-rw-r--r-- | contrib/libs/cxxsupp/libcxx/include/latch | 113 | ||||
-rw-r--r-- | library/cpp/yt/memory/free_list-inl.h | 222 | ||||
-rw-r--r-- | library/cpp/yt/memory/free_list.h | 72 | ||||
-rw-r--r-- | library/cpp/yt/memory/public.h | 3 | ||||
-rw-r--r-- | library/cpp/yt/memory/unittests/free_list_ut.cpp | 158 | ||||
-rw-r--r-- | library/cpp/yt/threading/public.h | 4 | ||||
-rw-r--r-- | library/cpp/yt/threading/rw_spin_lock.h | 2 | ||||
-rw-r--r-- | library/cpp/yt/threading/spin_lock.h | 3 |
8 files changed, 572 insertions, 5 deletions
diff --git a/contrib/libs/cxxsupp/libcxx/include/latch b/contrib/libs/cxxsupp/libcxx/include/latch new file mode 100644 index 0000000000..e1e15190ae --- /dev/null +++ b/contrib/libs/cxxsupp/libcxx/include/latch @@ -0,0 +1,113 @@ +// -*- C++ -*- +//===----------------------------------------------------------------------===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#ifndef _LIBCPP_LATCH +#define _LIBCPP_LATCH + +/* + latch synopsis + +namespace std +{ + + class latch + { + public: + static constexpr ptrdiff_t max() noexcept; + + constexpr explicit latch(ptrdiff_t __expected); + ~latch(); + + latch(const latch&) = delete; + latch& operator=(const latch&) = delete; + + void count_down(ptrdiff_t __update = 1); + bool try_wait() const noexcept; + void wait() const; + void arrive_and_wait(ptrdiff_t __update = 1); + + private: + ptrdiff_t __counter; // exposition only + }; + +} + +*/ + +#include <__availability> +#include <__config> +#include <atomic> +#include <version> + +#if !defined(_LIBCPP_HAS_NO_PRAGMA_SYSTEM_HEADER) +# pragma GCC system_header +#endif + +#ifdef _LIBCPP_HAS_NO_THREADS +# error <latch> is not supported on this single threaded system +#endif + +_LIBCPP_PUSH_MACROS +#include <__undef_macros> + +#if _LIBCPP_STD_VER >= 14 + +_LIBCPP_BEGIN_NAMESPACE_STD + +class latch +{ + __atomic_base<ptrdiff_t> __a; + +public: + static constexpr ptrdiff_t max() noexcept { + return numeric_limits<ptrdiff_t>::max(); + } + + inline _LIBCPP_INLINE_VISIBILITY + constexpr explicit latch(ptrdiff_t __expected) : __a(__expected) { } + + ~latch() = default; + latch(const latch&) = delete; + latch& operator=(const latch&) = delete; + + inline _LIBCPP_AVAILABILITY_SYNC _LIBCPP_INLINE_VISIBILITY + void count_down(ptrdiff_t __update = 1) + { + auto const __old = __a.fetch_sub(__update, memory_order_release); + if(__old == __update) + __a.notify_all(); + } + inline _LIBCPP_INLINE_VISIBILITY + bool try_wait() const noexcept + { + return 0 == __a.load(memory_order_acquire); + } + inline _LIBCPP_AVAILABILITY_SYNC _LIBCPP_INLINE_VISIBILITY + void wait() const + { + auto const __test_fn = [=]() -> bool { + return try_wait(); + }; + __cxx_atomic_wait(&__a.__a_, __test_fn); + } + inline _LIBCPP_AVAILABILITY_SYNC _LIBCPP_INLINE_VISIBILITY + void arrive_and_wait(ptrdiff_t __update = 1) + { + count_down(__update); + wait(); + } +}; + +_LIBCPP_END_NAMESPACE_STD + +#endif // _LIBCPP_STD_VER >= 14 + +_LIBCPP_POP_MACROS + +#endif //_LIBCPP_LATCH diff --git a/library/cpp/yt/memory/free_list-inl.h b/library/cpp/yt/memory/free_list-inl.h new file mode 100644 index 0000000000..d428b82498 --- /dev/null +++ b/library/cpp/yt/memory/free_list-inl.h @@ -0,0 +1,222 @@ +#ifndef FREE_LIST_INL_H_ +#error "Direct inclusion of this file is not allowed, include free_list.h" +// For the sake of sane code completion. +#include "free_list.h" +#endif + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +// DCAS is supported in Clang with option -mcx16, is not supported in GCC. See following links. +// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=84522 +// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=80878 + +template <class T1, class T2> +Y_FORCE_INLINE bool CompareAndSet( + TAtomicUint128* atomic, + T1& expected1, + T2& expected2, + T1 new1, + T2 new2) +{ +#if defined(__x86_64__) + bool success; + __asm__ __volatile__ + ( + "lock cmpxchg16b %1\n" + "setz %0" + : "=q"(success) + , "+m"(*atomic) + , "+a"(expected1) + , "+d"(expected2) + : "b"(new1) + , "c"(new2) + : "cc" + ); + return success; +#elif defined(__arm64__) || (defined(__aarch64__) && defined(RTE_ARM_FEATURE_ATOMICS)) + register ui64 x0 __asm("x0") = (ui64)expected1; + register ui64 x1 __asm("x1") = (ui64)expected2; + register ui64 x2 __asm("x2") = (ui64)new1; + register ui64 x3 __asm("x3") = (ui64)new2; + ui64 old1 = (ui64)expected1; + ui64 old2 = (ui64)expected2; + asm volatile + ( +#if defined(RTE_CC_CLANG) + ".arch armv8-a+lse\n" +#endif + "caspal %[old0], %[old1], %[upd0], %[upd1], [%[dst]]" + : [old0] "+r" (x0) + , [old1] "+r" (x1) + : [upd0] "r" (x2) + , [upd1] "r" (x3) + , [dst] "r" (atomic) + : "memory" + ); + expected1 = (T1)x0; + expected2 = (T2)x1; + return x0 == old1 && x1 == old2; +#elif defined(__aarch64__) + ui64 exp1 = reinterpret_cast<ui64>(expected1); + ui64 exp2 = reinterpret_cast<ui64>(expected2); + ui32 fail = 0; + + do { + ui64 current1 = 0; + ui64 current2 = 0; + asm volatile ( + "ldaxp %[cur1], %[cur2], [%[src]]" + : [cur1] "=r" (current1) + , [cur2] "=r" (current2) + : [src] "r" (atomic) + : "memory" + ); + + if (current1 != exp1 || current2 != exp2) { + expected1 = reinterpret_cast<T1>(current1); + expected2 = reinterpret_cast<T2>(current2); + return false; + } + + asm volatile ( + "stlxp %w[fail], %[new1], %[new2], [%[dst]]" + : [fail] "=&r" (fail) + : [new1] "r" (new1) + , [new2] "r" (new2) + , [dst] "r" (atomic) + : "memory" + ); + + } while (Y_UNLIKELY(fail)); + return true; +#else +# error Unsupported platform +#endif +} + +//////////////////////////////////////////////////////////////////////////////// + +template <class TItem> +TFreeList<TItem>::THead::THead(TItem* pointer) + : Pointer(pointer) +{ } + +template <class TItem> +TFreeList<TItem>::TFreeList() + : Head_() +{ } + +template <class TItem> +TFreeList<TItem>::TFreeList(TFreeList<TItem>&& other) + : Head_(other.ExtractAll()) +{ } + +template <class TItem> +TFreeList<TItem>::~TFreeList() +{ + YT_VERIFY(IsEmpty()); +} + +template <class TItem> +template <class TPredicate> +Y_NO_SANITIZE("thread") +bool TFreeList<TItem>::PutIf(TItem* head, TItem* tail, TPredicate predicate) +{ + auto* current = Head_.Pointer.load(std::memory_order::relaxed); + auto epoch = Head_.Epoch.load(std::memory_order::relaxed); + + while (predicate(current)) { + tail->Next.store(current, std::memory_order::release); + if (CompareAndSet(&AtomicHead_, current, epoch, head, epoch + 1)) { + return true; + } + } + + tail->Next.store(nullptr, std::memory_order::release); + + return false; +} + + +template <class TItem> +Y_NO_SANITIZE("thread") +void TFreeList<TItem>::Put(TItem* head, TItem* tail) +{ + auto* current = Head_.Pointer.load(std::memory_order::relaxed); + auto epoch = Head_.Epoch.load(std::memory_order::relaxed); + + do { + tail->Next.store(current, std::memory_order::release); + } while (!CompareAndSet(&AtomicHead_, current, epoch, head, epoch + 1)); +} + +template <class TItem> +void TFreeList<TItem>::Put(TItem* item) +{ + Put(item, item); +} + +template <class TItem> +Y_NO_SANITIZE("thread") +TItem* TFreeList<TItem>::Extract() +{ + auto* current = Head_.Pointer.load(std::memory_order::relaxed); + auto epoch = Head_.Epoch.load(std::memory_order::relaxed); + + while (current) { + // If current node is already extracted by other thread + // there can be any writes at address ¤t->Next. + // The only guaranteed thing is that address is valid (memory is not freed). + auto next = current->Next.load(std::memory_order::acquire); + if (CompareAndSet(&AtomicHead_, current, epoch, next, epoch + 1)) { + current->Next.store(nullptr, std::memory_order::release); + return current; + } + } + + return nullptr; +} + +template <class TItem> +TItem* TFreeList<TItem>::ExtractAll() +{ + auto* current = Head_.Pointer.load(std::memory_order::relaxed); + auto epoch = Head_.Epoch.load(std::memory_order::relaxed); + + while (current) { + if (CompareAndSet<TItem*, size_t>(&AtomicHead_, current, epoch, nullptr, epoch + 1)) { + return current; + } + } + + return nullptr; +} + +template <class TItem> +bool TFreeList<TItem>::IsEmpty() const +{ + return Head_.Pointer.load() == nullptr; +} + +template <class TItem> +void TFreeList<TItem>::Append(TFreeList<TItem>& other) +{ + auto* head = other.ExtractAll(); + + if (!head) { + return; + } + + auto* tail = head; + while (tail->Next) { + tail = tail->Next; + } + + Put(head, tail); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/free_list.h b/library/cpp/yt/memory/free_list.h new file mode 100644 index 0000000000..8c06bcb7a5 --- /dev/null +++ b/library/cpp/yt/memory/free_list.h @@ -0,0 +1,72 @@ +#pragma once + +#include "public.h" + +#include <atomic> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +struct TFreeListItemBase +{ + std::atomic<T*> Next = nullptr; +}; + +using TAtomicUint128 = volatile unsigned __int128 __attribute__((aligned(16))); + +template <class TItem> +class TFreeList +{ +private: + struct THead + { + std::atomic<TItem*> Pointer = nullptr; + std::atomic<size_t> Epoch = 0; + + THead() = default; + + explicit THead(TItem* pointer); + + }; + + union + { + THead Head_; + TAtomicUint128 AtomicHead_; + }; + + // Avoid false sharing. + char Padding[CacheLineSize - sizeof(TAtomicUint128)]; + +public: + TFreeList(); + + TFreeList(TFreeList&& other); + + ~TFreeList(); + + template <class TPredicate> + bool PutIf(TItem* head, TItem* tail, TPredicate predicate); + + void Put(TItem* head, TItem* tail); + + void Put(TItem* item); + + TItem* Extract(); + + TItem* ExtractAll(); + + bool IsEmpty() const; + + void Append(TFreeList& other); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define FREE_LIST_INL_H_ +#include "free_list-inl.h" +#undef FREE_LIST_INL_H_ diff --git a/library/cpp/yt/memory/public.h b/library/cpp/yt/memory/public.h index fb1546dd59..f05a6b4569 100644 --- a/library/cpp/yt/memory/public.h +++ b/library/cpp/yt/memory/public.h @@ -6,6 +6,9 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// +// TODO(babenko): consider increasing to 128 due to cache line pairing in L2 prefetcher. +constexpr size_t CacheLineSize = 64; + class TChunkedMemoryPool; DECLARE_REFCOUNTED_STRUCT(IMemoryChunkProvider) diff --git a/library/cpp/yt/memory/unittests/free_list_ut.cpp b/library/cpp/yt/memory/unittests/free_list_ut.cpp new file mode 100644 index 0000000000..3d2ecf103e --- /dev/null +++ b/library/cpp/yt/memory/unittests/free_list_ut.cpp @@ -0,0 +1,158 @@ +#include <library/cpp/testing/gtest/gtest.h> + +#include <library/cpp/yt/memory/free_list.h> + +#include <util/random/random.h> + +#include <thread> +#include <stack> +#include <latch> + +namespace NYT { +namespace { + +using namespace std::chrono_literals; + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TFreeListTest, CompareAndSet) +{ + TAtomicUint128 v = 0; + ui64 p1 = 0; + ui64 p2 = 0; + EXPECT_TRUE(CompareAndSet(&v, p1, p2, ui64{13}, ui64{9})); + EXPECT_FALSE(CompareAndSet(&v, p1, p2, ui64{100}, ui64{500})); + EXPECT_EQ(13u, p1); + EXPECT_EQ(9u, p2); + EXPECT_TRUE(CompareAndSet(&v, p1, p2, ui64{100}, ui64{500})); + EXPECT_EQ(TAtomicUint128{500} << 64 | 100, v); +} + +//////////////////////////////////////////////////////////////////////////////// + +struct TTestConfig +{ + ui64 Threads; + ui64 MaxBatchSize; + std::chrono::seconds TimeLimit; +}; + +class TFreeListStressTest + : public testing::TestWithParam<TTestConfig> +{ }; + +struct TTestItem + : public NYT::TFreeListItemBase<TTestItem> +{ + TTestItem() = default; + + ui64 Value = 0; + ui64 IndexInSet = 0; + // Avoid false sharing. + char Padding[CacheLineSize - 2 * sizeof(ui64)]; +}; + +class TTestItemSet +{ +public: + static void Reset() + { + Items_.clear(); + } + + static TTestItemSet Allocate(size_t setSize) + { + TTestItemSet set; + for (size_t i = 0; i < setSize; ++i) { + Items_.push_back(std::make_unique<TTestItem>()); + Items_.back()->IndexInSet = Items_.size() - 1; + set.Acquire(Items_.back().get()); + } + return set; + } + + void Acquire(TTestItem* item) + { + AcquiredItemIndices_.push(item->IndexInSet); + } + + TTestItem* Release() + { + YT_VERIFY(!AcquiredItemIndices_.empty()); + size_t index = AcquiredItemIndices_.top(); + AcquiredItemIndices_.pop(); + return Items_[index].get(); + } + +private: + inline static std::vector<std::unique_ptr<TTestItem>> Items_; + + std::stack<size_t> AcquiredItemIndices_; +}; + + +TEST_P(TFreeListStressTest, Stress) +{ + TTestItemSet::Reset(); + SetRandomSeed(0x424242); + auto params = GetParam(); + + TFreeList<TTestItem> list; + + std::latch start(params.Threads); + + std::atomic<bool> running{true}; + std::atomic<ui64> put{0}; + std::atomic<ui64> extracted{0}; + + std::vector<std::thread> workers; + for (ui64 i = 0; i < params.Threads; ++i) { + auto itemSet = TTestItemSet::Allocate(params.MaxBatchSize); + workers.emplace_back([&, params, itemSet = std::move(itemSet)]() mutable { + start.arrive_and_wait(); + + while (running.load(std::memory_order::relaxed)) { + // Push batch of items. + ui64 batchSize = 1 + RandomNumber<ui64>(params.MaxBatchSize); + for (ui64 i = 0; i < batchSize; ++i) { + auto* item = itemSet.Release(); + item->Value = 1 + RandomNumber<ui64>(1e9); + put.fetch_add(item->Value, std::memory_order::relaxed); + list.Put(item); + } + + // Pop batch of items. + for (ui64 i = 0; i < batchSize; ++i) { + auto* item = list.Extract(); + ASSERT_NE(item, nullptr); + extracted.fetch_add(item->Value, std::memory_order::relaxed); + itemSet.Acquire(item); + } + } + }); + } + + Sleep(params.TimeLimit); + running.store(false); + + for (auto& worker : workers) { + worker.join(); + } + + Cerr << "Put: " << put.load() << Endl; + Cerr << "Extracted: " << extracted.load() << Endl; + EXPECT_EQ(put.load(), extracted.load()); +} + +INSTANTIATE_TEST_SUITE_P( + TFreeListTest, + TFreeListStressTest, + testing::Values( + TTestConfig{4, 1, 15s}, + TTestConfig{4, 3, 15s}, + TTestConfig{4, 5, 15s})); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT diff --git a/library/cpp/yt/threading/public.h b/library/cpp/yt/threading/public.h index 92d062ac3e..ab6cb22663 100644 --- a/library/cpp/yt/threading/public.h +++ b/library/cpp/yt/threading/public.h @@ -6,13 +6,9 @@ namespace NYT::NThreading { //////////////////////////////////////////////////////////////////////////////// -// TODO(babenko): consider increasing to 128 due to cache line pairing in L2 prefetcher. -constexpr size_t CacheLineSize = 64; - #define YT_DECLARE_SPIN_LOCK(type, name) \ type name{__LOCATION__} //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NThreading - diff --git a/library/cpp/yt/threading/rw_spin_lock.h b/library/cpp/yt/threading/rw_spin_lock.h index 3e834dfcc9..d1235dfc4a 100644 --- a/library/cpp/yt/threading/rw_spin_lock.h +++ b/library/cpp/yt/threading/rw_spin_lock.h @@ -3,6 +3,8 @@ #include "public.h" #include "spin_lock_base.h" +#include <library/cpp/yt/memory/public.h> + #include <util/system/rwlock.h> #include <atomic> diff --git a/library/cpp/yt/threading/spin_lock.h b/library/cpp/yt/threading/spin_lock.h index 28103af1ba..d0d5c3cdbf 100644 --- a/library/cpp/yt/threading/spin_lock.h +++ b/library/cpp/yt/threading/spin_lock.h @@ -7,6 +7,8 @@ #include <library/cpp/yt/system/thread_id.h> +#include <library/cpp/yt/memory/public.h> + #include <util/system/src_location.h> #include <util/system/types.h> @@ -78,4 +80,3 @@ private: #define SPIN_LOCK_INL_H_ #include "spin_lock-inl.h" #undef SPIN_LOCK_INL_H_ - |