diff options
author | ni-stoiko <ni-stoiko@yandex-team.com> | 2023-08-30 12:26:15 +0300 |
---|---|---|
committer | ni-stoiko <ni-stoiko@yandex-team.com> | 2023-08-30 13:00:19 +0300 |
commit | 7ccc4cc77189900b84644f90037c7490e05ee881 (patch) | |
tree | 0dabceb885541eb6ba5c7aac2af383cd8b7e885a /yt | |
parent | cb3da9494c53283f0230ad37e4e8d0ea61b7d8fc (diff) | |
download | ydb-7ccc4cc77189900b84644f90037c7490e05ee881.tar.gz |
YT-19556: Removing usage of MemoryTag
Clean RPC from CurrentMemoryTag. Update rpc_allocation_tags_ut
Restore memory_tag_ut.cpp
Remove MemoryTag from core
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/core/actions/bind-inl.h | 25 | ||||
-rw-r--r-- | yt/yt/core/concurrency/action_queue.cpp | 37 | ||||
-rw-r--r-- | yt/yt/core/misc/unittests/memory_tag_ut.cpp | 112 | ||||
-rw-r--r-- | yt/yt/core/rpc/helpers.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp | 19 | ||||
-rw-r--r-- | yt/yt/library/ytprof/heap_profiler.cpp | 14 | ||||
-rw-r--r-- | yt/yt/library/ytprof/heap_profiler.h | 4 |
7 files changed, 25 insertions, 191 deletions
diff --git a/yt/yt/core/actions/bind-inl.h b/yt/yt/core/actions/bind-inl.h index 0633a5712f..f88d3116d9 100644 --- a/yt/yt/core/actions/bind-inl.h +++ b/yt/yt/core/actions/bind-inl.h @@ -7,10 +7,6 @@ #include <yt/yt/core/concurrency/propagating_storage.h> -#include <yt/yt/core/tracing/trace_context.h> - -#include <library/cpp/yt/memory/memory_tag.h> - namespace NYT { //////////////////////////////////////////////////////////////////////////////// @@ -508,27 +504,12 @@ public: auto* volatile unoptimizedState = state; Y_UNUSED(unoptimizedState); - auto memoryTag = GetCurrentMemoryTag(); - auto propagatingStorageGuard = state->MakePropagatingStorageGuard(); Y_UNUSED(propagatingStorageGuard); - if (memoryTag != NullMemoryTag) { - auto traceContext = NTracing::GetOrCreateTraceContext("BindMemoryTag"); - - // Does NOT finish the trace context upon destruction. - NTracing::TCurrentTraceContextGuard contextGuard(traceContext); - - traceContext->SetAllocationTag(NTracing::MemoryTagLiteral, memoryTag); - - return state->Functor( - NDetail::Unwrap(std::get<BoundIndexes>(state->BoundArgs))..., - std::forward<TAs>(args)...); - } else { - return state->Functor( - NDetail::Unwrap(std::get<BoundIndexes>(state->BoundArgs))..., - std::forward<TAs>(args)...); - } + return state->Functor( + NDetail::Unwrap(std::get<BoundIndexes>(state->BoundArgs))..., + std::forward<TAs>(args)...); } private: diff --git a/yt/yt/core/concurrency/action_queue.cpp b/yt/yt/core/concurrency/action_queue.cpp index aa42272a75..14f9d2d8e8 100644 --- a/yt/yt/core/concurrency/action_queue.cpp +++ b/yt/yt/core/concurrency/action_queue.cpp @@ -15,8 +15,6 @@ #include <yt/yt/core/misc/ring_queue.h> #include <yt/yt/core/misc/shutdown.h> -#include <library/cpp/yt/memory/memory_tag.h> - #include <util/thread/lfqueue.h> namespace NYT::NConcurrency { @@ -651,41 +649,6 @@ ISuspendableInvokerPtr CreateSuspendableInvoker(IInvokerPtr underlyingInvoker) //////////////////////////////////////////////////////////////////////////////// -class TMemoryTaggingInvoker - : public TInvokerWrapper -{ -public: - TMemoryTaggingInvoker(IInvokerPtr invoker, TMemoryTag memoryTag) - : TInvokerWrapper(std::move(invoker)) - , MemoryTag_(memoryTag) - { } - - void Invoke(TClosure callback) override - { - UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE( - &TMemoryTaggingInvoker::RunCallback, - MakeStrong(this), - Passed(std::move(callback)))); - } - -private: - TMemoryTag MemoryTag_; - - void RunCallback(TClosure callback) - { - TCurrentInvokerGuard currentInvokerGuard(this); - TMemoryTagGuard memoryTagGuard(MemoryTag_); - callback(); - } -}; - -IInvokerPtr CreateMemoryTaggingInvoker(IInvokerPtr underlyingInvoker, TMemoryTag tag) -{ - return New<TMemoryTaggingInvoker>(std::move(underlyingInvoker), tag); -} - -//////////////////////////////////////////////////////////////////////////////// - class TCodicilGuardedInvoker : public TInvokerWrapper { diff --git a/yt/yt/core/misc/unittests/memory_tag_ut.cpp b/yt/yt/core/misc/unittests/memory_tag_ut.cpp index 06f2e240f1..346cb86e0b 100644 --- a/yt/yt/core/misc/unittests/memory_tag_ut.cpp +++ b/yt/yt/core/misc/unittests/memory_tag_ut.cpp @@ -6,17 +6,15 @@ #include <yt/yt/core/concurrency/thread_pool.h> #include <yt/yt/core/concurrency/scheduler.h> -#include <yt/yt/core/misc/lazy_ptr.h> - -#include <yt/yt/core/tracing/allocation_tags.h> -#include <yt/yt/core/tracing/trace_context.h> - #include <library/cpp/yt/memory/memory_tag.h> #include <util/random/random.h> #include <util/system/compiler.h> +// These tests do not work under MSAN and ASAN. +#if !defined(_msan_enabled_) and !defined(_asan_enabled_) and defined(_linux_) and defined(YT_ALLOC_ENABLED) + namespace NYT { //////////////////////////////////////////////////////////////////////////////// @@ -24,12 +22,13 @@ namespace NYT { // Used for fake side effects to disable compiler optimizations. volatile const void* FakeSideEffectVolatileVariable = nullptr; +//////////////////////////////////////////////////////////////////////////////// + +namespace { + using namespace NConcurrency; using namespace ::testing; -// These tests do not work under MSAN and ASAN. -#if !defined(_msan_enabled_) and !defined(_asan_enabled_) and defined(_linux_) and defined(YT_ALLOC_ENABLED) - //////////////////////////////////////////////////////////////////////////////// class TMemoryTagTest @@ -239,98 +238,7 @@ INSTANTIATE_TEST_SUITE_P(MemoryTagTest, TMemoryTagTest, Values( //////////////////////////////////////////////////////////////////////////////// -#endif // !defined(_msan_enabled_) - -//////////////////////////////////////////////////////////////////////////////// - -using namespace NTracing; - -TEST(TMemoryTagTest, MemoryTagPropagationViaAllocationTags) -{ - auto localContext = CreateTraceContextFromCurrent("MemoryTagPropagation"); - auto localTag = 1u; - - localContext->SetAllocationTags({ - {MemoryTagLiteral, ToString(localTag)} - }); - - auto guard = TCurrentTraceContextGuard(localContext); - - auto actionQueue = New<TActionQueue>(); - - auto tag = 2u; - auto invoker = CreateMemoryTaggingInvoker(actionQueue->GetInvoker(), tag); - auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); - EXPECT_EQ(currentTag, localTag); - - auto asyncResult = BIND_NO_PROPAGATE([=] { - auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); - EXPECT_EQ(currentTag, tag); - }) - .AsyncVia(invoker) - .Run(); - - WaitFor(asyncResult) - .ThrowOnError(); - - currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); - EXPECT_EQ(currentTag, localTag); -} - -void TestYield(TMemoryTag tag) -{ - auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); - EXPECT_EQ(currentTag, tag); - - Yield(); - currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); - EXPECT_EQ(currentTag, tag); - - Yield(); - currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); - EXPECT_EQ(currentTag, tag); -} - -TEST(TMemoryTagTest, MemoryTagWithYieldContextPropagation) -{ - auto localContext = CreateTraceContextFromCurrent("MemoryTagSwitchContextPropagation"); - auto localTag = 1u; - - localContext->SetAllocationTags({ - {MemoryTagLiteral, ToString(localTag)} - }); - - auto guard = TCurrentTraceContextGuard(localContext); - - auto actionQueue = New<TActionQueue>(); - - auto tag1 = 222u; - auto tag2 = 333u; - - auto invoker1 = CreateMemoryTaggingInvoker(actionQueue->GetInvoker(), tag1); - auto invoker2 = CreateMemoryTaggingInvoker(actionQueue->GetInvoker(), tag2); - auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); - EXPECT_EQ(currentTag, localTag); - - // Use BIND_NO_PROPAGATE in order not overwrite tags in localContext. - - auto asyncResult1 = BIND_NO_PROPAGATE(TestYield) - .AsyncVia(invoker1) - .Run(tag1); - - auto asyncResult2 = BIND_NO_PROPAGATE(TestYield) - .AsyncVia(invoker2) - .Run(tag2); - - WaitFor(asyncResult1) - .ThrowOnError(); - WaitFor(asyncResult2) - .ThrowOnError(); - - currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); - EXPECT_EQ(currentTag, localTag); -} - -//////////////////////////////////////////////////////////////////////////////// - +} // namespace } // namespace NYT + +#endif // !defined(_msan_enabled_) diff --git a/yt/yt/core/rpc/helpers.cpp b/yt/yt/core/rpc/helpers.cpp index a881ac49a4..d23f2995ad 100644 --- a/yt/yt/core/rpc/helpers.cpp +++ b/yt/yt/core/rpc/helpers.cpp @@ -434,13 +434,8 @@ TTraceContextPtr CreateCallTraceContext(std::string service, std::string method) } auto traceContext = oldTraceContext->CreateChild(Format("RpcClient:%v.%v", service, method)); - traceContext->SetAllocationTagsPtr(oldTraceContext->GetAllocationTagsPtr()); - if (GetCurrentMemoryTag() && !traceContext->FindAllocationTag<TMemoryTag>(NTracing::MemoryTagLiteral)) { - traceContext->SetAllocationTag<TMemoryTag>(NTracing::MemoryTagLiteral, GetCurrentMemoryTag()); - } - return traceContext; } diff --git a/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp index b78d4aa79e..57d25c70d0 100644 --- a/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp +++ b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp @@ -27,7 +27,7 @@ TYPED_TEST(TRpcTest, ResponseWithAllocationTags) NYTProf::EnableMemoryProfilingTags(); - auto initialMemoryUsage = GetMemoryUsageForTag(testMemoryTag); + auto initialMemoryUsage = NYTProf::GetEstimatedMemoryUsage()[testMemoryTag]; auto actionQueue = New<TActionQueue>(); @@ -36,8 +36,9 @@ TYPED_TEST(TRpcTest, ResponseWithAllocationTags) TTestProxy proxy(this->CreateChannel()); - constexpr auto size = 1_MB; - for (int i = 0; i < 10; ++i) { + constexpr auto size = 4_MB - 1_KB; + constexpr auto numberOfLoops = 10; + for (int i = 0; i < numberOfLoops; ++i) { auto context = CreateTraceContextFromCurrent("ResponseWithAllocationTags"); auto contextGuard = TTraceContextGuard(context); context->SetAllocationTag(MemoryTagLiteral, testMemoryTag); @@ -67,15 +68,19 @@ TYPED_TEST(TRpcTest, ResponseWithAllocationTags) responses.push_back(rspFutureProp); } - for (auto& rsp : responses) { + auto memoryUsageBefore = NYTProf::GetEstimatedMemoryUsage()[testMemoryTag]; + EXPECT_LE(memoryUsageBefore, numberOfLoops * 1536_KB); + + for (const auto& rsp : responses) { WaitFor(rsp).ValueOrThrow(); } auto memoryUsageAfter = NYTProf::GetEstimatedMemoryUsage()[testMemoryTag]; - auto deltaMemoryUsage = memoryUsageAfter - initialMemoryUsage; - EXPECT_GE(deltaMemoryUsage, 14_MB) + auto deltaMemoryUsage = memoryUsageAfter - initialMemoryUsage - memoryUsageBefore; + EXPECT_GE(deltaMemoryUsage, numberOfLoops * size * 6 / 5) << "InitialUsage: " << initialMemoryUsage << std::endl - << "After waiting: " << memoryUsageAfter; + << "MemoryUsage before waiting: " << memoryUsageBefore << std::endl + << "MemoryUsage after waiting: " << memoryUsageAfter; } #endif diff --git a/yt/yt/library/ytprof/heap_profiler.cpp b/yt/yt/library/ytprof/heap_profiler.cpp index 2fecf95473..9d681b767d 100644 --- a/yt/yt/library/ytprof/heap_profiler.cpp +++ b/yt/yt/library/ytprof/heap_profiler.cpp @@ -180,20 +180,6 @@ THashMap<TMemoryTag, ui64> GetEstimatedMemoryUsage() return usage; } -static thread_local TMemoryTag MemoryTag = 0; - -TMemoryTag GetMemoryTag() -{ - return MemoryTag; -} - -TMemoryTag SetMemoryTag(TMemoryTag newTag) -{ - auto oldTag = MemoryTag; - MemoryTag = newTag; - return oldTag; -} - struct TMemoryUsageSnapshot { TSpinLock Lock; diff --git a/yt/yt/library/ytprof/heap_profiler.h b/yt/yt/library/ytprof/heap_profiler.h index 31489987fe..455e710adc 100644 --- a/yt/yt/library/ytprof/heap_profiler.h +++ b/yt/yt/library/ytprof/heap_profiler.h @@ -23,10 +23,6 @@ int AbslStackUnwinder(void** frames, int*, using TMemoryTag = uintptr_t; -TMemoryTag GetMemoryTag(); - -TMemoryTag SetMemoryTag(TMemoryTag newTag); - THashMap<TMemoryTag, ui64> GetEstimatedMemoryUsage(); void UpdateMemoryUsageSnapshot(THashMap<TMemoryTag, ui64> usageSnapshot); |