diff options
author | babenko <babenko@yandex-team.com> | 2024-11-30 19:44:10 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2024-11-30 19:58:49 +0300 |
commit | 1a14888488e86ce1cea6ed977e98b65a24577b44 (patch) | |
tree | 5a2d38a6b0f209c9b27bfa5af147dfad4a5545c8 | |
parent | 878e5db788b99c7ea5be8edaa14c22eace57e991 (diff) | |
download | ydb-1a14888488e86ce1cea6ed977e98b65a24577b44.tar.gz |
Deprecate memory tags API in yt codebase
commit_hash:90937e5cd8a5f01663a1f162955925e299c3d892
-rw-r--r-- | yt/yt/core/concurrency/fiber_scheduler_thread.cpp | 15 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fls-inl.h | 3 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/scheduler_ut.cpp | 63 | ||||
-rw-r--r-- | yt/yt/core/misc/ref_counted_tracker.cpp | 6 | ||||
-rw-r--r-- | yt/yt/core/misc/unittests/memory_tag_ut.cpp | 243 | ||||
-rw-r--r-- | yt/yt/core/misc/unittests/ya.make | 1 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp | 6 | ||||
-rw-r--r-- | yt/yt/core/tracing/unittests/allocation_tags_ut.cpp | 6 | ||||
-rw-r--r-- | yt/yt/library/ytprof/unittests/heap_profiler_ut.cpp | 8 |
9 files changed, 10 insertions, 341 deletions
diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp index 72130a6717..f4af46f471 100644 --- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp @@ -20,8 +20,6 @@ #include <library/cpp/yt/global/variable.h> -#include <library/cpp/yt/memory/memory_tag.h> - #include <library/cpp/yt/memory/function_view.h> #include <library/cpp/yt/threading/fork_aware_spin_lock.h> @@ -180,13 +178,6 @@ private: //////////////////////////////////////////////////////////////////////////////// -Y_FORCE_INLINE TMemoryTag SwapMemoryTag(TMemoryTag tag) -{ - auto result = GetCurrentMemoryTag(); - SetCurrentMemoryTag(tag); - return result; -} - Y_FORCE_INLINE TFiberId SwapCurrentFiberId(TFiberId fiberId) { auto result = GetCurrentFiberId(); @@ -818,7 +809,6 @@ protected: void OnSwitch() { FiberId_ = SwapCurrentFiberId(FiberId_); - MemoryTag_ = SwapMemoryTag(MemoryTag_); Fls_ = SwapCurrentFls(Fls_); MinLogLevel_ = SwapMinLogLevel(MinLogLevel_); } @@ -826,13 +816,11 @@ protected: ~TBaseSwitchHandler() { YT_VERIFY(FiberId_ == InvalidFiberId); - YT_VERIFY(MemoryTag_ == NullMemoryTag); YT_VERIFY(!Fls_); YT_VERIFY(MinLogLevel_ == ELogLevel::Minimum); } private: - TMemoryTag MemoryTag_ = NullMemoryTag; TFls* Fls_ = nullptr; TFiberId FiberId_ = InvalidFiberId; ELogLevel MinLogLevel_ = ELogLevel::Minimum; @@ -1147,7 +1135,6 @@ TFiberCanceler GetCurrentFiberCanceler() } if (!switchHandler->Canceler()) { - TMemoryTagGuard guard(NullMemoryTag); switchHandler->Canceler() = New<NDetail::TCanceler>(GetCurrentFiberId()); } @@ -1162,8 +1149,6 @@ void WaitUntilSet(TFuture<void> future, IInvokerPtr invoker) YT_VERIFY(future); YT_ASSERT(invoker); - TMemoryTagGuard memoryTagGuard(NullMemoryTag); - auto* currentFiber = NDetail::TryGetCurrentFiber(); if (!currentFiber) { // When called from a fiber-unfriendly context, we fallback to blocking wait. diff --git a/yt/yt/core/concurrency/fls-inl.h b/yt/yt/core/concurrency/fls-inl.h index 4f8eba6b6f..6267f7f054 100644 --- a/yt/yt/core/concurrency/fls-inl.h +++ b/yt/yt/core/concurrency/fls-inl.h @@ -5,8 +5,6 @@ #endif #undef FLS_INL_H_ -#include <library/cpp/yt/memory/memory_tag.h> - #include <library/cpp/yt/misc/tls.h> namespace NYT::NConcurrency { @@ -97,7 +95,6 @@ Y_FORCE_INLINE T* TFlsSlot<T>::GetOrCreate() const template <class T> T* TFlsSlot<T>::Create() const { - TMemoryTagGuard guard(NullMemoryTag); auto cookie = new T(); GetCurrentFls()->Set(Index_, cookie); return static_cast<T*>(cookie); diff --git a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp index 15320fae90..957c1f2ddf 100644 --- a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp +++ b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp @@ -1042,31 +1042,6 @@ TEST_W(TSchedulerTest, CancelDelayedFuture) EXPECT_EQ(NYT::EErrorCode::Generic, error.InnerErrors()[0].GetCode()); } -class TVerifyingMemoryTagGuard -{ -public: - explicit TVerifyingMemoryTagGuard(TMemoryTag tag) - : Tag_(tag) - , SavedTag_(GetCurrentMemoryTag()) - { - SetCurrentMemoryTag(Tag_); - } - - ~TVerifyingMemoryTagGuard() - { - auto tag = GetCurrentMemoryTag(); - EXPECT_EQ(tag, Tag_); - SetCurrentMemoryTag(SavedTag_); - } - - TVerifyingMemoryTagGuard(const TVerifyingMemoryTagGuard& other) = delete; - TVerifyingMemoryTagGuard(TVerifyingMemoryTagGuard&& other) = delete; - -private: - const TMemoryTag Tag_; - const TMemoryTag SavedTag_; -}; - class TWrappingInvoker : public TInvokerWrapper<false> { @@ -1094,44 +1069,6 @@ public: void virtual DoRunCallback(TClosure callback) = 0; }; -class TVerifyingMemoryTaggingInvoker - : public TWrappingInvoker -{ -public: - TVerifyingMemoryTaggingInvoker(IInvokerPtr invoker, TMemoryTag memoryTag) - : TWrappingInvoker(std::move(invoker)) - , MemoryTag_(memoryTag) - { } - -private: - const TMemoryTag MemoryTag_; - - void DoRunCallback(TClosure callback) override - { - TVerifyingMemoryTagGuard memoryTagGuard(MemoryTag_); - callback(); - } -}; - -TEST_W(TSchedulerTest, MemoryTagAndResumer) -{ - auto actionQueue = New<TActionQueue>(); - - auto invoker1 = New<TVerifyingMemoryTaggingInvoker>(actionQueue->GetInvoker(), 1); - auto invoker2 = New<TVerifyingMemoryTaggingInvoker>(actionQueue->GetInvoker(), 2); - - auto asyncResult = BIND([=] { - EXPECT_EQ(GetCurrentMemoryTag(), 1u); - SwitchTo(invoker2); - EXPECT_EQ(GetCurrentMemoryTag(), 1u); - }) - .AsyncVia(invoker1) - .Run(); - - WaitFor(asyncResult) - .ThrowOnError(); -} - void CheckTraceContextTime(const NTracing::TTraceContextPtr& traceContext, TDuration lo, TDuration hi) { auto actual = traceContext->GetElapsedTime(); diff --git a/yt/yt/core/misc/ref_counted_tracker.cpp b/yt/yt/core/misc/ref_counted_tracker.cpp index 85d1e0fd1a..21f9eb815f 100644 --- a/yt/yt/core/misc/ref_counted_tracker.cpp +++ b/yt/yt/core/misc/ref_counted_tracker.cpp @@ -6,8 +6,6 @@ #include <library/cpp/yt/string/format.h> -#include <library/cpp/yt/memory/memory_tag.h> - #include <library/cpp/yt/misc/tls.h> #include <algorithm> @@ -402,8 +400,6 @@ void TRefCountedTracker::FreeSpaceSlow(TRefCountedTypeCookie cookie, size_t spac TRefCountedTracker::TLocalSlot* TRefCountedTracker::GetLocalSlot(TRefCountedTypeCookie cookie) { - TMemoryTagGuard memoryTagGuard(NullMemoryTag); - struct TReclaimer { ~TReclaimer() @@ -460,8 +456,6 @@ TRefCountedTracker::TLocalSlot* TRefCountedTracker::GetLocalSlot(TRefCountedType TRefCountedTracker::TGlobalSlot* TRefCountedTracker::GetGlobalSlot(TRefCountedTypeCookie cookie) { - TMemoryTagGuard memoryTagGuard(NullMemoryTag); - VERIFY_SPINLOCK_AFFINITY(SpinLock_); auto index = cookie.Underlying(); if (index >= std::ssize(GlobalSlots_)) { diff --git a/yt/yt/core/misc/unittests/memory_tag_ut.cpp b/yt/yt/core/misc/unittests/memory_tag_ut.cpp deleted file mode 100644 index f74563f64d..0000000000 --- a/yt/yt/core/misc/unittests/memory_tag_ut.cpp +++ /dev/null @@ -1,243 +0,0 @@ -#include <yt/yt/core/test_framework/framework.h> - -#include <yt/yt/core/actions/invoker_util.h> - -#include <yt/yt/core/concurrency/action_queue.h> -#include <yt/yt/core/concurrency/thread_pool.h> -#include <yt/yt/core/concurrency/scheduler.h> - -#include <library/cpp/yt/memory/memory_tag.h> - -#include <util/random/random.h> - -#include <util/system/compiler.h> - -// These tests do not work under MSAN and ASAN. -#if !defined(_msan_enabled_) and !defined(_asan_enabled_) and defined(_linux_) and defined(YT_ALLOC_ENABLED) - -namespace NYT { -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -// Used for fake side effects to disable compiler optimizations. -volatile const void* FakeSideEffectVolatileVariable = nullptr; - -//////////////////////////////////////////////////////////////////////////////// - -using namespace NConcurrency; -using namespace ::testing; - -//////////////////////////////////////////////////////////////////////////////// - -class TMemoryTagTest - : public TestWithParam<void(*)()> -{ -public: - TMemoryTagTest() = default; -}; - -//////////////////////////////////////////////////////////////////////////////// - -// Allocate vector that results in exactly `size` memory usage considering the 16-byte header. -std::vector<char> MakeAllocation(size_t size) -{ - YT_VERIFY(IsPowerOf2(size)); - - auto result = std::vector<char>(size); - - // We make fake side effect to prevent any compiler optimizations here. - // (Clever compilers like to throw away our unused allocations). - FakeSideEffectVolatileVariable = result.data(); - return result; -} - -//////////////////////////////////////////////////////////////////////////////// - -void TestStackingGuards() -{ - TMemoryTagGuard guard1(1); - EXPECT_EQ(GetMemoryUsageForTag(1), 0u); - auto allocation1 = MakeAllocation(1 << 5); - EXPECT_EQ(GetMemoryUsageForTag(1), 1u << 5); - { - TMemoryTagGuard guard2(2); - auto allocation2 = MakeAllocation(1 << 6); - EXPECT_EQ(GetMemoryUsageForTag(1), 1u << 5); - EXPECT_EQ(GetMemoryUsageForTag(2), 1u << 6); - } - EXPECT_EQ(GetMemoryUsageForTag(1), 1u << 5); - EXPECT_EQ(GetMemoryUsageForTag(2), 0u); - { - TMemoryTagGuard guard2(std::move(guard1)); - auto allocation2 = MakeAllocation(1 << 7); - EXPECT_EQ(GetMemoryUsageForTag(1), (1u << 5) + (1u << 7)); - EXPECT_EQ(GetMemoryUsageForTag(2), 0u); - } - EXPECT_EQ(GetMemoryUsageForTag(1), (1u << 5)); - EXPECT_EQ(GetMemoryUsageForTag(2), 0u); -} - -//////////////////////////////////////////////////////////////////////////////// - -void Action1() -{ - TMemoryTagGuard guard(1); - Yield(); - auto allocation1 = MakeAllocation(1 << 5); - EXPECT_EQ(GetMemoryUsageForTag(1), 1u << 5); - Yield(); - auto allocation2 = MakeAllocation(1 << 7); - EXPECT_EQ(GetMemoryUsageForTag(1), (1u << 5) + (1u << 7)); - Yield(); - auto allocation3 = MakeAllocation(1 << 9); - EXPECT_EQ(GetMemoryUsageForTag(1), (1u << 5) + (1u << 7) + (1u << 9)); -} - -void Action2() -{ - TMemoryTagGuard guard(2); - Yield(); - auto allocation1 = MakeAllocation(1 << 6); - EXPECT_EQ(GetMemoryUsageForTag(2), 1u << 6); - Yield(); - auto allocation2 = MakeAllocation(1 << 8); - EXPECT_EQ(GetMemoryUsageForTag(2), (1u << 6) + (1u << 8)); - Yield(); - auto allocation3 = MakeAllocation(1 << 10); - EXPECT_EQ(GetMemoryUsageForTag(2), (1u << 6) + (1u << 8) + (1u << 10)); -} - -void TestSwitchingFibers() -{ - auto future1 = BIND(&Action1) - .AsyncVia(GetCurrentInvoker()) - .Run(); - auto future2 = BIND(&Action2) - .AsyncVia(GetCurrentInvoker()) - .Run(); - WaitFor(AllSucceeded(std::vector<TFuture<void>>{future1, future2})) - .ThrowOnError(); - EXPECT_EQ(GetMemoryUsageForTag(1), 0u); - EXPECT_EQ(GetMemoryUsageForTag(2), 0u); -} - -//////////////////////////////////////////////////////////////////////////////// - -class TMiniController - : public TRefCounted -{ -public: - TMiniController(IInvokerPtr controlInvoker, TMemoryTag memoryTag) - : MemoryTag_(memoryTag) - , Invoker_(CreateMemoryTaggingInvoker(CreateSerializedInvoker(std::move(controlInvoker)), MemoryTag_)) - { } - - ssize_t GetMemoryUsage() const - { - return GetMemoryUsageForTag(MemoryTag_); - } - - IInvokerPtr GetControlInvoker() const - { - return Invoker_; - } - - std::vector<std::vector<char>>& Allocations() - { - return Allocations_; - } - -private: - TMemoryTag MemoryTag_; - IInvokerPtr Invoker_; - std::vector<std::vector<char>> Allocations_; -}; - -DEFINE_REFCOUNTED_TYPE(TMiniController) -DECLARE_REFCOUNTED_CLASS(TMiniController) - -void Action3(TMiniControllerPtr controller) -{ - controller->Allocations().emplace_back(MakeAllocation(128_MB)); -} - -void TestMemoryTaggingInvoker() -{ - auto queue = New<TActionQueue>(); - auto controller = New<TMiniController>(queue->GetInvoker(), 1); - EXPECT_EQ(controller->GetMemoryUsage(), 0); - - WaitFor(BIND(&Action3, controller) - .AsyncVia(controller->GetControlInvoker()) - .Run()) - .ThrowOnError(); - EXPECT_NEAR(controller->GetMemoryUsage(), 128_MB, 1_MB); - - controller->Allocations().clear(); - controller->Allocations().shrink_to_fit(); - - EXPECT_NEAR(GetMemoryUsageForTag(1), 0, 1_MB); -} - -void TestControllersInThreadPool() -{ - std::vector<TMiniControllerPtr> controllers; - constexpr int controllerCount = 1000; - auto pool = CreateThreadPool(16, "TestPool"); - for (int index = 0; index < controllerCount; ++index) { - controllers.emplace_back(New<TMiniController>(pool->GetInvoker(), index + 1)); - } - constexpr int actionCount = 100 * 1000; - std::vector<TFuture<void>> futures; - std::vector<int> memoryUsages(controllerCount); - srand(42); - for (int index = 0; index < actionCount; ++index) { - int controllerIndex = rand() % controllerCount; - auto allocationSize = 1 << (5 + rand() % 10); - memoryUsages[controllerIndex] += allocationSize; - const auto& controller = controllers[controllerIndex]; - futures.emplace_back( - BIND([] (TMiniControllerPtr controller, int allocationSize) { - controller->Allocations().emplace_back(MakeAllocation(allocationSize)); - }, controller, allocationSize) - .AsyncVia(controller->GetControlInvoker()) - .Run()); - } - WaitFor(AllSucceeded(futures)) - .ThrowOnError(); - for (int index = 0; index < controllerCount; ++index) { - EXPECT_NEAR(memoryUsages[index], controllers[index]->GetMemoryUsage(), 10_KB); - } - controllers.clear(); - for (int index = 0; index < controllerCount; ++index) { - EXPECT_NEAR(GetMemoryUsageForTag(index + 1), 0, 10_KB); - EXPECT_GE(GetMemoryUsageForTag(index + 1), 0u); - } -} - -//////////////////////////////////////////////////////////////////////////////// - -TEST_P(TMemoryTagTest, Test) -{ - // We wrap anything with an outer action queue to make - // fiber-friendly environment. - auto outerQueue = New<TActionQueue>(); - WaitFor(BIND(GetParam()) - .AsyncVia(outerQueue->GetInvoker()) - .Run()) - .ThrowOnError(); -} - -INSTANTIATE_TEST_SUITE_P(MemoryTagTest, TMemoryTagTest, Values( - &TestStackingGuards, - &TestSwitchingFibers, - &TestMemoryTaggingInvoker, - &TestControllersInThreadPool)); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace -} // namespace NYT - -#endif // !defined(_msan_enabled_) diff --git a/yt/yt/core/misc/unittests/ya.make b/yt/yt/core/misc/unittests/ya.make index 62fb8ece13..a422838270 100644 --- a/yt/yt/core/misc/unittests/ya.make +++ b/yt/yt/core/misc/unittests/ya.make @@ -42,7 +42,6 @@ SRCS( lock_free_hash_table_ut.cpp lru_cache_ut.cpp maybe_inf_ut.cpp - memory_tag_ut.cpp moving_average_ut.cpp mpsc_fair_share_queue_ut.cpp mpsc_stack_ut.cpp diff --git a/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp index ec5c49d7cc..55dd20bde7 100644 --- a/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp +++ b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp @@ -33,7 +33,7 @@ TYPED_TEST(TRpcTest, ResponseWithAllocationTags) auto previousLimit = memoryUsageTracker->GetLimit(); memoryUsageTracker->SetLimit(2_GB); - static TMemoryTag testMemoryTag = 1 << 20; + static int testMemoryTag = 1 << 20; testMemoryTag++; EnableMemoryProfilingTags(); @@ -70,11 +70,11 @@ TYPED_TEST(TRpcTest, ResponseWithAllocationTags) req2->set_size(size); auto rspFutureProp = req2->Invoke() - .Apply(BIND([testMemoryTag=testMemoryTag] (const TRspPtr& res) { + .Apply(BIND([testMemoryTag = testMemoryTag] (const TRspPtr& res) { auto localContext = TryGetCurrentTraceContext(); EXPECT_NE(localContext, nullptr); if (localContext) { - EXPECT_EQ(localContext->FindAllocationTag<TMemoryTag>(MemoryAllocationTag).value_or(NullMemoryTag), testMemoryTag); + EXPECT_EQ(localContext->FindAllocationTag<int>(MemoryAllocationTag).value_or(NullMemoryTag), testMemoryTag); } return res; }).AsyncVia(actionQueue->GetInvoker())); diff --git a/yt/yt/core/tracing/unittests/allocation_tags_ut.cpp b/yt/yt/core/tracing/unittests/allocation_tags_ut.cpp index 9c567309ee..86a645ea5f 100644 --- a/yt/yt/core/tracing/unittests/allocation_tags_ut.cpp +++ b/yt/yt/core/tracing/unittests/allocation_tags_ut.cpp @@ -16,7 +16,7 @@ TEST(TAllocationTagsTest, GetSetAllocationTags) ASSERT_EQ(traceContext->FindAllocationTag<std::string>("a"), std::nullopt); traceContext->SetAllocationTags({{"user", "first"}, {"sometag", "my"}}); - ASSERT_EQ(traceContext->FindAllocationTag<TMemoryTag>("memory_tag"), std::nullopt); + ASSERT_EQ(traceContext->FindAllocationTag<int>("memory_tag"), std::nullopt); ASSERT_EQ(traceContext->FindAllocationTag<std::string>("user"), "first"); ASSERT_EQ(traceContext->FindAllocationTag<std::string>("sometag"), "my"); ASSERT_EQ(traceContext->FindAllocationTag<std::string>("other"), std::nullopt); @@ -35,8 +35,8 @@ TEST(TAllocationTagsTest, GetSetAllocationTags) ASSERT_EQ(traceContext->FindAllocationTag<std::string>("sometag"), std::nullopt); ASSERT_TRUE(traceContext->GetAllocationTags().empty()); - traceContext->SetAllocationTag<TMemoryTag>("memory_tag", TMemoryTag{1}); - ASSERT_EQ(traceContext->FindAllocationTag<TMemoryTag>("memory_tag"), TMemoryTag{1}); + traceContext->SetAllocationTag<int>("memory_tag", 1); + ASSERT_EQ(traceContext->FindAllocationTag<int>("memory_tag"), 1); ASSERT_FALSE(traceContext->GetAllocationTags().empty()); } diff --git a/yt/yt/library/ytprof/unittests/heap_profiler_ut.cpp b/yt/yt/library/ytprof/unittests/heap_profiler_ut.cpp index 6935216130..a7a19b8eb9 100644 --- a/yt/yt/library/ytprof/unittests/heap_profiler_ut.cpp +++ b/yt/yt/library/ytprof/unittests/heap_profiler_ut.cpp @@ -64,9 +64,9 @@ TEST(THeapProfilerTest, ReadProfile) auto h0 = BlowHeap<0>(); - auto tag = TMemoryTag(1); + int tag = 1; traceContext->SetAllocationTags({{"user", "second"}, {"sometag", "notmy"}, {MemoryAllocationTagKey, ToString(tag)}}); - auto currentTag = traceContext->FindAllocationTag<TMemoryTag>(MemoryAllocationTagKey); + auto currentTag = traceContext->FindAllocationTag<int>(MemoryAllocationTagKey); ASSERT_EQ(currentTag, tag); auto h1 = BlowHeap<1>(); @@ -99,7 +99,7 @@ TEST(THeapProfilerTest, ReadProfile) output.Finish(); } -TEST(THeapProfilerTest, AllocationTagsWithMemoryTag) +TEST(THeapProfilerTest, AllocationTags) { EnableMemoryProfilingTags(); auto traceContext = TTraceContext::NewRoot("Root"); @@ -201,7 +201,7 @@ TEST(THeapProfilerTest, HugeAllocationsTagsWithMemoryTag) heap.push_back(BlowHeap<0>()); traceContext->SetAllocationTag(MemoryAllocationTagKey, MemoryAllocationTagValues[1]); - ASSERT_EQ(traceContext->FindAllocationTag<TMemoryTag>(MemoryAllocationTagKey), 1); + ASSERT_EQ(traceContext->FindAllocationTag<int>(MemoryAllocationTagKey), 1); heap.push_back(BlowHeap<1>(100)); |