diff options
| author | aleexfi <[email protected]> | 2023-05-10 13:26:37 +0300 | 
|---|---|---|
| committer | aleexfi <[email protected]> | 2023-05-10 13:26:37 +0300 | 
| commit | a5581558250508c2cae6de5225e540e12289a530 (patch) | |
| tree | 4cb62dc7dd7e67f753acf539d3515bb9c41bba67 /library/cpp | |
| parent | cc5864c4a61e0fb13950c8ad0c2067874cddcc2f (diff) | |
YT-17689: Move TFreeList to library/cpp/yt/memory
Iteration no. 2. First one reverted due to YT-18997
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/yt/memory/free_list-inl.h | 222 | ||||
| -rw-r--r-- | library/cpp/yt/memory/free_list.h | 72 | ||||
| -rw-r--r-- | library/cpp/yt/memory/public.h | 3 | ||||
| -rw-r--r-- | library/cpp/yt/memory/unittests/free_list_ut.cpp | 158 | ||||
| -rw-r--r-- | library/cpp/yt/threading/public.h | 4 | ||||
| -rw-r--r-- | library/cpp/yt/threading/rw_spin_lock.h | 2 | ||||
| -rw-r--r-- | library/cpp/yt/threading/spin_lock.h | 3 | 
7 files changed, 459 insertions, 5 deletions
diff --git a/library/cpp/yt/memory/free_list-inl.h b/library/cpp/yt/memory/free_list-inl.h new file mode 100644 index 00000000000..d428b824982 --- /dev/null +++ b/library/cpp/yt/memory/free_list-inl.h @@ -0,0 +1,222 @@ +#ifndef FREE_LIST_INL_H_ +#error "Direct inclusion of this file is not allowed, include free_list.h" +// For the sake of sane code completion. +#include "free_list.h" +#endif + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +// DCAS is supported in Clang with option -mcx16, is not supported in GCC. See following links. +// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=84522 +// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=80878 + +template <class T1, class T2> +Y_FORCE_INLINE bool CompareAndSet( +    TAtomicUint128* atomic, +    T1& expected1, +    T2& expected2, +    T1 new1, +    T2 new2) +{ +#if defined(__x86_64__) +    bool success; +    __asm__ __volatile__ +    ( +        "lock cmpxchg16b %1\n" +        "setz %0" +        : "=q"(success) +        , "+m"(*atomic) +        , "+a"(expected1) +        , "+d"(expected2) +        : "b"(new1) +        , "c"(new2) +        : "cc" +    ); +    return success; +#elif defined(__arm64__) || (defined(__aarch64__) && defined(RTE_ARM_FEATURE_ATOMICS)) +    register ui64 x0 __asm("x0") = (ui64)expected1; +    register ui64 x1 __asm("x1") = (ui64)expected2; +    register ui64 x2 __asm("x2") = (ui64)new1; +    register ui64 x3 __asm("x3") = (ui64)new2; +    ui64 old1 = (ui64)expected1; +    ui64 old2 = (ui64)expected2; +    asm volatile +    ( +#if defined(RTE_CC_CLANG) +        ".arch armv8-a+lse\n" +#endif +        "caspal %[old0], %[old1], %[upd0], %[upd1], [%[dst]]" +        : [old0] "+r" (x0) +        , [old1] "+r" (x1) +        : [upd0] "r" (x2) +        , [upd1] "r" (x3) +        , [dst] "r" (atomic) +        : "memory" +    ); +    expected1 = (T1)x0; +    expected2 = (T2)x1; +    return x0 == old1 && x1 == old2; +#elif defined(__aarch64__) +    ui64 exp1 = reinterpret_cast<ui64>(expected1); +    ui64 exp2 = reinterpret_cast<ui64>(expected2); +    ui32 fail = 0; + +    do { +        ui64 current1 = 0; +        ui64 current2 = 0; +        asm volatile ( +            "ldaxp %[cur1], %[cur2], [%[src]]" +            : [cur1] "=r" (current1) +            , [cur2] "=r" (current2) +            : [src] "r" (atomic) +            : "memory" +        ); + +        if (current1 != exp1 || current2 != exp2) { +            expected1 = reinterpret_cast<T1>(current1); +            expected2 = reinterpret_cast<T2>(current2); +            return false; +        } + +        asm volatile ( +            "stlxp %w[fail], %[new1], %[new2], [%[dst]]" +            : [fail] "=&r" (fail) +            : [new1] "r" (new1) +            , [new2] "r" (new2) +            , [dst] "r" (atomic) +            : "memory" +        ); + +    } while (Y_UNLIKELY(fail)); +    return true; +#else +#    error Unsupported platform +#endif +} + +//////////////////////////////////////////////////////////////////////////////// + +template <class TItem> +TFreeList<TItem>::THead::THead(TItem* pointer) +    : Pointer(pointer) +{ } + +template <class TItem> +TFreeList<TItem>::TFreeList() +    : Head_() +{ } + +template <class TItem> +TFreeList<TItem>::TFreeList(TFreeList<TItem>&& other) +    : Head_(other.ExtractAll()) +{ } + +template <class TItem> +TFreeList<TItem>::~TFreeList() +{ +    YT_VERIFY(IsEmpty()); +} + +template <class TItem> +template <class TPredicate> +Y_NO_SANITIZE("thread") +bool TFreeList<TItem>::PutIf(TItem* head, TItem* tail, TPredicate predicate) +{ +    auto* current = Head_.Pointer.load(std::memory_order::relaxed); +    auto epoch = Head_.Epoch.load(std::memory_order::relaxed); + +    while (predicate(current)) { +        tail->Next.store(current, std::memory_order::release); +        if (CompareAndSet(&AtomicHead_, current, epoch, head, epoch + 1)) { +            return true; +        } +    } + +    tail->Next.store(nullptr, std::memory_order::release); + +    return false; +} + + +template <class TItem> +Y_NO_SANITIZE("thread") +void TFreeList<TItem>::Put(TItem* head, TItem* tail) +{ +    auto* current = Head_.Pointer.load(std::memory_order::relaxed); +    auto epoch = Head_.Epoch.load(std::memory_order::relaxed); + +    do { +        tail->Next.store(current, std::memory_order::release); +    } while (!CompareAndSet(&AtomicHead_, current, epoch, head, epoch + 1)); +} + +template <class TItem> +void TFreeList<TItem>::Put(TItem* item) +{ +    Put(item, item); +} + +template <class TItem> +Y_NO_SANITIZE("thread") +TItem* TFreeList<TItem>::Extract() +{ +    auto* current = Head_.Pointer.load(std::memory_order::relaxed); +    auto epoch = Head_.Epoch.load(std::memory_order::relaxed); + +    while (current) { +        // If current node is already extracted by other thread +        // there can be any writes at address ¤t->Next. +        // The only guaranteed thing is that address is valid (memory is not freed). +        auto next = current->Next.load(std::memory_order::acquire); +        if (CompareAndSet(&AtomicHead_, current, epoch, next, epoch + 1)) { +            current->Next.store(nullptr, std::memory_order::release); +            return current; +        } +    } + +    return nullptr; +} + +template <class TItem> +TItem* TFreeList<TItem>::ExtractAll() +{ +    auto* current = Head_.Pointer.load(std::memory_order::relaxed); +    auto epoch = Head_.Epoch.load(std::memory_order::relaxed); + +    while (current) { +        if (CompareAndSet<TItem*, size_t>(&AtomicHead_, current, epoch, nullptr, epoch + 1)) { +            return current; +        } +    } + +    return nullptr; +} + +template <class TItem> +bool TFreeList<TItem>::IsEmpty() const +{ +    return Head_.Pointer.load() == nullptr; +} + +template <class TItem> +void TFreeList<TItem>::Append(TFreeList<TItem>& other) +{ +    auto* head = other.ExtractAll(); + +    if (!head) { +        return; +    } + +    auto* tail = head; +    while (tail->Next) { +        tail = tail->Next; +    } + +    Put(head, tail); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/free_list.h b/library/cpp/yt/memory/free_list.h new file mode 100644 index 00000000000..8c06bcb7a53 --- /dev/null +++ b/library/cpp/yt/memory/free_list.h @@ -0,0 +1,72 @@ +#pragma once + +#include "public.h" + +#include <atomic> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +struct TFreeListItemBase +{ +    std::atomic<T*> Next = nullptr; +}; + +using TAtomicUint128 = volatile unsigned __int128  __attribute__((aligned(16))); + +template <class TItem> +class TFreeList +{ +private: +    struct THead +    { +        std::atomic<TItem*> Pointer = nullptr; +        std::atomic<size_t> Epoch = 0; + +        THead() = default; + +        explicit THead(TItem* pointer); + +    }; + +    union +    { +        THead Head_; +        TAtomicUint128 AtomicHead_; +    }; + +    // Avoid false sharing. +    char Padding[CacheLineSize - sizeof(TAtomicUint128)]; + +public: +    TFreeList(); + +    TFreeList(TFreeList&& other); + +    ~TFreeList(); + +    template <class TPredicate> +    bool PutIf(TItem* head, TItem* tail, TPredicate predicate); + +    void Put(TItem* head, TItem* tail); + +    void Put(TItem* item); + +    TItem* Extract(); + +    TItem* ExtractAll(); + +    bool IsEmpty() const; + +    void Append(TFreeList& other); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define FREE_LIST_INL_H_ +#include "free_list-inl.h" +#undef FREE_LIST_INL_H_ diff --git a/library/cpp/yt/memory/public.h b/library/cpp/yt/memory/public.h index fb1546dd595..f05a6b45693 100644 --- a/library/cpp/yt/memory/public.h +++ b/library/cpp/yt/memory/public.h @@ -6,6 +6,9 @@ namespace NYT {  //////////////////////////////////////////////////////////////////////////////// +// TODO(babenko): consider increasing to 128 due to cache line pairing in L2 prefetcher. +constexpr size_t CacheLineSize = 64; +  class TChunkedMemoryPool;  DECLARE_REFCOUNTED_STRUCT(IMemoryChunkProvider) diff --git a/library/cpp/yt/memory/unittests/free_list_ut.cpp b/library/cpp/yt/memory/unittests/free_list_ut.cpp new file mode 100644 index 00000000000..3d2ecf103ef --- /dev/null +++ b/library/cpp/yt/memory/unittests/free_list_ut.cpp @@ -0,0 +1,158 @@ +#include <library/cpp/testing/gtest/gtest.h> + +#include <library/cpp/yt/memory/free_list.h> + +#include <util/random/random.h> + +#include <thread> +#include <stack> +#include <latch> + +namespace NYT { +namespace { + +using namespace std::chrono_literals; + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TFreeListTest, CompareAndSet) +{ +    TAtomicUint128 v = 0; +    ui64 p1 = 0; +    ui64 p2 = 0; +    EXPECT_TRUE(CompareAndSet(&v, p1, p2, ui64{13}, ui64{9})); +    EXPECT_FALSE(CompareAndSet(&v, p1, p2, ui64{100}, ui64{500})); +    EXPECT_EQ(13u, p1); +    EXPECT_EQ(9u, p2); +    EXPECT_TRUE(CompareAndSet(&v, p1, p2, ui64{100}, ui64{500})); +    EXPECT_EQ(TAtomicUint128{500} << 64 | 100, v); +} + +//////////////////////////////////////////////////////////////////////////////// + +struct TTestConfig +{ +    ui64 Threads; +    ui64 MaxBatchSize; +    std::chrono::seconds TimeLimit; +}; + +class TFreeListStressTest +    : public testing::TestWithParam<TTestConfig> +{ }; + +struct TTestItem +    : public NYT::TFreeListItemBase<TTestItem> +{ +    TTestItem() = default; + +    ui64 Value = 0; +    ui64 IndexInSet = 0; +    // Avoid false sharing. +    char Padding[CacheLineSize - 2 * sizeof(ui64)]; +}; + +class TTestItemSet +{ +public: +    static void Reset() +    { +        Items_.clear(); +    } + +    static TTestItemSet Allocate(size_t setSize) +    { +        TTestItemSet set; +        for (size_t i = 0; i < setSize; ++i) { +            Items_.push_back(std::make_unique<TTestItem>()); +            Items_.back()->IndexInSet = Items_.size() - 1; +            set.Acquire(Items_.back().get()); +        } +        return set; +    } + +    void Acquire(TTestItem* item) +    { +        AcquiredItemIndices_.push(item->IndexInSet); +    } + +    TTestItem* Release() +    { +        YT_VERIFY(!AcquiredItemIndices_.empty()); +        size_t index = AcquiredItemIndices_.top(); +        AcquiredItemIndices_.pop(); +        return Items_[index].get(); +    } + +private: +    inline static std::vector<std::unique_ptr<TTestItem>> Items_; + +    std::stack<size_t> AcquiredItemIndices_; +}; + + +TEST_P(TFreeListStressTest, Stress) +{ +    TTestItemSet::Reset(); +    SetRandomSeed(0x424242); +    auto params = GetParam(); + +    TFreeList<TTestItem> list; + +    std::latch start(params.Threads); + +    std::atomic<bool> running{true}; +    std::atomic<ui64> put{0}; +    std::atomic<ui64> extracted{0}; + +    std::vector<std::thread> workers; +    for (ui64 i = 0; i < params.Threads; ++i) { +        auto itemSet = TTestItemSet::Allocate(params.MaxBatchSize); +        workers.emplace_back([&, params, itemSet = std::move(itemSet)]() mutable { +            start.arrive_and_wait(); + +            while (running.load(std::memory_order::relaxed)) { +                // Push batch of items. +                ui64 batchSize = 1 + RandomNumber<ui64>(params.MaxBatchSize); +                for (ui64 i = 0; i < batchSize; ++i) { +                    auto* item = itemSet.Release(); +                    item->Value = 1 + RandomNumber<ui64>(1e9); +                    put.fetch_add(item->Value, std::memory_order::relaxed); +                    list.Put(item); +                } + +                // Pop batch of items. +                for (ui64 i = 0; i < batchSize; ++i) { +                    auto* item = list.Extract(); +                    ASSERT_NE(item, nullptr); +                    extracted.fetch_add(item->Value, std::memory_order::relaxed); +                    itemSet.Acquire(item); +                } +            } +        }); +    } + +    Sleep(params.TimeLimit); +    running.store(false); + +    for (auto& worker : workers) { +        worker.join(); +    } + +    Cerr << "Put: " << put.load() << Endl; +    Cerr << "Extracted: " << extracted.load() << Endl; +    EXPECT_EQ(put.load(), extracted.load()); +} + +INSTANTIATE_TEST_SUITE_P( +    TFreeListTest, +    TFreeListStressTest, +    testing::Values( +        TTestConfig{4, 1, 15s}, +        TTestConfig{4, 3, 15s}, +        TTestConfig{4, 5, 15s})); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT diff --git a/library/cpp/yt/threading/public.h b/library/cpp/yt/threading/public.h index 92d062ac3e1..ab6cb226633 100644 --- a/library/cpp/yt/threading/public.h +++ b/library/cpp/yt/threading/public.h @@ -6,13 +6,9 @@ namespace NYT::NThreading {  //////////////////////////////////////////////////////////////////////////////// -// TODO(babenko): consider increasing to 128 due to cache line pairing in L2 prefetcher. -constexpr size_t CacheLineSize = 64; -  #define YT_DECLARE_SPIN_LOCK(type, name) \      type name{__LOCATION__}  ////////////////////////////////////////////////////////////////////////////////  } // namespace NYT::NThreading - diff --git a/library/cpp/yt/threading/rw_spin_lock.h b/library/cpp/yt/threading/rw_spin_lock.h index 3e834dfcc9d..d1235dfc4a5 100644 --- a/library/cpp/yt/threading/rw_spin_lock.h +++ b/library/cpp/yt/threading/rw_spin_lock.h @@ -3,6 +3,8 @@  #include "public.h"  #include "spin_lock_base.h" +#include <library/cpp/yt/memory/public.h> +  #include <util/system/rwlock.h>  #include <atomic> diff --git a/library/cpp/yt/threading/spin_lock.h b/library/cpp/yt/threading/spin_lock.h index 28103af1baa..d0d5c3cdbf1 100644 --- a/library/cpp/yt/threading/spin_lock.h +++ b/library/cpp/yt/threading/spin_lock.h @@ -7,6 +7,8 @@  #include <library/cpp/yt/system/thread_id.h> +#include <library/cpp/yt/memory/public.h> +  #include <util/system/src_location.h>  #include <util/system/types.h> @@ -78,4 +80,3 @@ private:  #define SPIN_LOCK_INL_H_  #include "spin_lock-inl.h"  #undef SPIN_LOCK_INL_H_ -  | 
