aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorni-stoiko <ni-stoiko@yandex-team.com>2023-08-30 12:26:15 +0300
committerni-stoiko <ni-stoiko@yandex-team.com>2023-08-30 13:00:19 +0300
commit7ccc4cc77189900b84644f90037c7490e05ee881 (patch)
tree0dabceb885541eb6ba5c7aac2af383cd8b7e885a /yt
parentcb3da9494c53283f0230ad37e4e8d0ea61b7d8fc (diff)
downloadydb-7ccc4cc77189900b84644f90037c7490e05ee881.tar.gz
YT-19556: Removing usage of MemoryTag
Clean RPC from CurrentMemoryTag. Update rpc_allocation_tags_ut Restore memory_tag_ut.cpp Remove MemoryTag from core
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/core/actions/bind-inl.h25
-rw-r--r--yt/yt/core/concurrency/action_queue.cpp37
-rw-r--r--yt/yt/core/misc/unittests/memory_tag_ut.cpp112
-rw-r--r--yt/yt/core/rpc/helpers.cpp5
-rw-r--r--yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp19
-rw-r--r--yt/yt/library/ytprof/heap_profiler.cpp14
-rw-r--r--yt/yt/library/ytprof/heap_profiler.h4
7 files changed, 25 insertions, 191 deletions
diff --git a/yt/yt/core/actions/bind-inl.h b/yt/yt/core/actions/bind-inl.h
index 0633a5712f..f88d3116d9 100644
--- a/yt/yt/core/actions/bind-inl.h
+++ b/yt/yt/core/actions/bind-inl.h
@@ -7,10 +7,6 @@
#include <yt/yt/core/concurrency/propagating_storage.h>
-#include <yt/yt/core/tracing/trace_context.h>
-
-#include <library/cpp/yt/memory/memory_tag.h>
-
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
@@ -508,27 +504,12 @@ public:
auto* volatile unoptimizedState = state;
Y_UNUSED(unoptimizedState);
- auto memoryTag = GetCurrentMemoryTag();
-
auto propagatingStorageGuard = state->MakePropagatingStorageGuard();
Y_UNUSED(propagatingStorageGuard);
- if (memoryTag != NullMemoryTag) {
- auto traceContext = NTracing::GetOrCreateTraceContext("BindMemoryTag");
-
- // Does NOT finish the trace context upon destruction.
- NTracing::TCurrentTraceContextGuard contextGuard(traceContext);
-
- traceContext->SetAllocationTag(NTracing::MemoryTagLiteral, memoryTag);
-
- return state->Functor(
- NDetail::Unwrap(std::get<BoundIndexes>(state->BoundArgs))...,
- std::forward<TAs>(args)...);
- } else {
- return state->Functor(
- NDetail::Unwrap(std::get<BoundIndexes>(state->BoundArgs))...,
- std::forward<TAs>(args)...);
- }
+ return state->Functor(
+ NDetail::Unwrap(std::get<BoundIndexes>(state->BoundArgs))...,
+ std::forward<TAs>(args)...);
}
private:
diff --git a/yt/yt/core/concurrency/action_queue.cpp b/yt/yt/core/concurrency/action_queue.cpp
index aa42272a75..14f9d2d8e8 100644
--- a/yt/yt/core/concurrency/action_queue.cpp
+++ b/yt/yt/core/concurrency/action_queue.cpp
@@ -15,8 +15,6 @@
#include <yt/yt/core/misc/ring_queue.h>
#include <yt/yt/core/misc/shutdown.h>
-#include <library/cpp/yt/memory/memory_tag.h>
-
#include <util/thread/lfqueue.h>
namespace NYT::NConcurrency {
@@ -651,41 +649,6 @@ ISuspendableInvokerPtr CreateSuspendableInvoker(IInvokerPtr underlyingInvoker)
////////////////////////////////////////////////////////////////////////////////
-class TMemoryTaggingInvoker
- : public TInvokerWrapper
-{
-public:
- TMemoryTaggingInvoker(IInvokerPtr invoker, TMemoryTag memoryTag)
- : TInvokerWrapper(std::move(invoker))
- , MemoryTag_(memoryTag)
- { }
-
- void Invoke(TClosure callback) override
- {
- UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE(
- &TMemoryTaggingInvoker::RunCallback,
- MakeStrong(this),
- Passed(std::move(callback))));
- }
-
-private:
- TMemoryTag MemoryTag_;
-
- void RunCallback(TClosure callback)
- {
- TCurrentInvokerGuard currentInvokerGuard(this);
- TMemoryTagGuard memoryTagGuard(MemoryTag_);
- callback();
- }
-};
-
-IInvokerPtr CreateMemoryTaggingInvoker(IInvokerPtr underlyingInvoker, TMemoryTag tag)
-{
- return New<TMemoryTaggingInvoker>(std::move(underlyingInvoker), tag);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
class TCodicilGuardedInvoker
: public TInvokerWrapper
{
diff --git a/yt/yt/core/misc/unittests/memory_tag_ut.cpp b/yt/yt/core/misc/unittests/memory_tag_ut.cpp
index 06f2e240f1..346cb86e0b 100644
--- a/yt/yt/core/misc/unittests/memory_tag_ut.cpp
+++ b/yt/yt/core/misc/unittests/memory_tag_ut.cpp
@@ -6,17 +6,15 @@
#include <yt/yt/core/concurrency/thread_pool.h>
#include <yt/yt/core/concurrency/scheduler.h>
-#include <yt/yt/core/misc/lazy_ptr.h>
-
-#include <yt/yt/core/tracing/allocation_tags.h>
-#include <yt/yt/core/tracing/trace_context.h>
-
#include <library/cpp/yt/memory/memory_tag.h>
#include <util/random/random.h>
#include <util/system/compiler.h>
+// These tests do not work under MSAN and ASAN.
+#if !defined(_msan_enabled_) and !defined(_asan_enabled_) and defined(_linux_) and defined(YT_ALLOC_ENABLED)
+
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
@@ -24,12 +22,13 @@ namespace NYT {
// Used for fake side effects to disable compiler optimizations.
volatile const void* FakeSideEffectVolatileVariable = nullptr;
+////////////////////////////////////////////////////////////////////////////////
+
+namespace {
+
using namespace NConcurrency;
using namespace ::testing;
-// These tests do not work under MSAN and ASAN.
-#if !defined(_msan_enabled_) and !defined(_asan_enabled_) and defined(_linux_) and defined(YT_ALLOC_ENABLED)
-
////////////////////////////////////////////////////////////////////////////////
class TMemoryTagTest
@@ -239,98 +238,7 @@ INSTANTIATE_TEST_SUITE_P(MemoryTagTest, TMemoryTagTest, Values(
////////////////////////////////////////////////////////////////////////////////
-#endif // !defined(_msan_enabled_)
-
-////////////////////////////////////////////////////////////////////////////////
-
-using namespace NTracing;
-
-TEST(TMemoryTagTest, MemoryTagPropagationViaAllocationTags)
-{
- auto localContext = CreateTraceContextFromCurrent("MemoryTagPropagation");
- auto localTag = 1u;
-
- localContext->SetAllocationTags({
- {MemoryTagLiteral, ToString(localTag)}
- });
-
- auto guard = TCurrentTraceContextGuard(localContext);
-
- auto actionQueue = New<TActionQueue>();
-
- auto tag = 2u;
- auto invoker = CreateMemoryTaggingInvoker(actionQueue->GetInvoker(), tag);
- auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
- EXPECT_EQ(currentTag, localTag);
-
- auto asyncResult = BIND_NO_PROPAGATE([=] {
- auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
- EXPECT_EQ(currentTag, tag);
- })
- .AsyncVia(invoker)
- .Run();
-
- WaitFor(asyncResult)
- .ThrowOnError();
-
- currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
- EXPECT_EQ(currentTag, localTag);
-}
-
-void TestYield(TMemoryTag tag)
-{
- auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
- EXPECT_EQ(currentTag, tag);
-
- Yield();
- currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
- EXPECT_EQ(currentTag, tag);
-
- Yield();
- currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
- EXPECT_EQ(currentTag, tag);
-}
-
-TEST(TMemoryTagTest, MemoryTagWithYieldContextPropagation)
-{
- auto localContext = CreateTraceContextFromCurrent("MemoryTagSwitchContextPropagation");
- auto localTag = 1u;
-
- localContext->SetAllocationTags({
- {MemoryTagLiteral, ToString(localTag)}
- });
-
- auto guard = TCurrentTraceContextGuard(localContext);
-
- auto actionQueue = New<TActionQueue>();
-
- auto tag1 = 222u;
- auto tag2 = 333u;
-
- auto invoker1 = CreateMemoryTaggingInvoker(actionQueue->GetInvoker(), tag1);
- auto invoker2 = CreateMemoryTaggingInvoker(actionQueue->GetInvoker(), tag2);
- auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
- EXPECT_EQ(currentTag, localTag);
-
- // Use BIND_NO_PROPAGATE in order not overwrite tags in localContext.
-
- auto asyncResult1 = BIND_NO_PROPAGATE(TestYield)
- .AsyncVia(invoker1)
- .Run(tag1);
-
- auto asyncResult2 = BIND_NO_PROPAGATE(TestYield)
- .AsyncVia(invoker2)
- .Run(tag2);
-
- WaitFor(asyncResult1)
- .ThrowOnError();
- WaitFor(asyncResult2)
- .ThrowOnError();
-
- currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
- EXPECT_EQ(currentTag, localTag);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
+} // namespace
} // namespace NYT
+
+#endif // !defined(_msan_enabled_)
diff --git a/yt/yt/core/rpc/helpers.cpp b/yt/yt/core/rpc/helpers.cpp
index a881ac49a4..d23f2995ad 100644
--- a/yt/yt/core/rpc/helpers.cpp
+++ b/yt/yt/core/rpc/helpers.cpp
@@ -434,13 +434,8 @@ TTraceContextPtr CreateCallTraceContext(std::string service, std::string method)
}
auto traceContext = oldTraceContext->CreateChild(Format("RpcClient:%v.%v", service, method));
-
traceContext->SetAllocationTagsPtr(oldTraceContext->GetAllocationTagsPtr());
- if (GetCurrentMemoryTag() && !traceContext->FindAllocationTag<TMemoryTag>(NTracing::MemoryTagLiteral)) {
- traceContext->SetAllocationTag<TMemoryTag>(NTracing::MemoryTagLiteral, GetCurrentMemoryTag());
- }
-
return traceContext;
}
diff --git a/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp
index b78d4aa79e..57d25c70d0 100644
--- a/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp
+++ b/yt/yt/core/rpc/unittests/rpc_allocation_tags_ut.cpp
@@ -27,7 +27,7 @@ TYPED_TEST(TRpcTest, ResponseWithAllocationTags)
NYTProf::EnableMemoryProfilingTags();
- auto initialMemoryUsage = GetMemoryUsageForTag(testMemoryTag);
+ auto initialMemoryUsage = NYTProf::GetEstimatedMemoryUsage()[testMemoryTag];
auto actionQueue = New<TActionQueue>();
@@ -36,8 +36,9 @@ TYPED_TEST(TRpcTest, ResponseWithAllocationTags)
TTestProxy proxy(this->CreateChannel());
- constexpr auto size = 1_MB;
- for (int i = 0; i < 10; ++i) {
+ constexpr auto size = 4_MB - 1_KB;
+ constexpr auto numberOfLoops = 10;
+ for (int i = 0; i < numberOfLoops; ++i) {
auto context = CreateTraceContextFromCurrent("ResponseWithAllocationTags");
auto contextGuard = TTraceContextGuard(context);
context->SetAllocationTag(MemoryTagLiteral, testMemoryTag);
@@ -67,15 +68,19 @@ TYPED_TEST(TRpcTest, ResponseWithAllocationTags)
responses.push_back(rspFutureProp);
}
- for (auto& rsp : responses) {
+ auto memoryUsageBefore = NYTProf::GetEstimatedMemoryUsage()[testMemoryTag];
+ EXPECT_LE(memoryUsageBefore, numberOfLoops * 1536_KB);
+
+ for (const auto& rsp : responses) {
WaitFor(rsp).ValueOrThrow();
}
auto memoryUsageAfter = NYTProf::GetEstimatedMemoryUsage()[testMemoryTag];
- auto deltaMemoryUsage = memoryUsageAfter - initialMemoryUsage;
- EXPECT_GE(deltaMemoryUsage, 14_MB)
+ auto deltaMemoryUsage = memoryUsageAfter - initialMemoryUsage - memoryUsageBefore;
+ EXPECT_GE(deltaMemoryUsage, numberOfLoops * size * 6 / 5)
<< "InitialUsage: " << initialMemoryUsage << std::endl
- << "After waiting: " << memoryUsageAfter;
+ << "MemoryUsage before waiting: " << memoryUsageBefore << std::endl
+ << "MemoryUsage after waiting: " << memoryUsageAfter;
}
#endif
diff --git a/yt/yt/library/ytprof/heap_profiler.cpp b/yt/yt/library/ytprof/heap_profiler.cpp
index 2fecf95473..9d681b767d 100644
--- a/yt/yt/library/ytprof/heap_profiler.cpp
+++ b/yt/yt/library/ytprof/heap_profiler.cpp
@@ -180,20 +180,6 @@ THashMap<TMemoryTag, ui64> GetEstimatedMemoryUsage()
return usage;
}
-static thread_local TMemoryTag MemoryTag = 0;
-
-TMemoryTag GetMemoryTag()
-{
- return MemoryTag;
-}
-
-TMemoryTag SetMemoryTag(TMemoryTag newTag)
-{
- auto oldTag = MemoryTag;
- MemoryTag = newTag;
- return oldTag;
-}
-
struct TMemoryUsageSnapshot
{
TSpinLock Lock;
diff --git a/yt/yt/library/ytprof/heap_profiler.h b/yt/yt/library/ytprof/heap_profiler.h
index 31489987fe..455e710adc 100644
--- a/yt/yt/library/ytprof/heap_profiler.h
+++ b/yt/yt/library/ytprof/heap_profiler.h
@@ -23,10 +23,6 @@ int AbslStackUnwinder(void** frames, int*,
using TMemoryTag = uintptr_t;
-TMemoryTag GetMemoryTag();
-
-TMemoryTag SetMemoryTag(TMemoryTag newTag);
-
THashMap<TMemoryTag, ui64> GetEstimatedMemoryUsage();
void UpdateMemoryUsageSnapshot(THashMap<TMemoryTag, ui64> usageSnapshot);