summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorni-stoiko <[email protected]>2023-08-16 18:28:27 +0300
committerni-stoiko <[email protected]>2023-08-16 20:26:40 +0300
commitc354caae81dae5f18cd82554ef6746a347907e8f (patch)
treec2c63fc4465da3f9a2dea231f5006bec2812c6dc
parenta5f07dbb2c2d83106f82c3eb29d1dd4dabaeaabb (diff)
YT-19555: Using Allocation tags for access MemoryTag
Applied changes for allocation_tags Changes are applied for heap_profiler
-rw-r--r--yt/yt/core/actions/bind-inl.h25
-rw-r--r--yt/yt/core/concurrency/action_queue.cpp2
-rw-r--r--yt/yt/core/concurrency/fiber_scheduler_thread.cpp10
-rw-r--r--yt/yt/core/concurrency/unittests/scheduler_ut.cpp9
-rw-r--r--yt/yt/core/misc/unittests/memory_tag_ut.cpp112
-rw-r--r--yt/yt/core/tracing/allocation_hooks.cpp22
-rw-r--r--yt/yt/core/tracing/allocation_tags.cpp27
-rw-r--r--yt/yt/core/tracing/allocation_tags.h12
-rw-r--r--yt/yt/core/tracing/public.h4
-rw-r--r--yt/yt/core/tracing/trace_context-inl.h104
-rw-r--r--yt/yt/core/tracing/trace_context.cpp60
-rw-r--r--yt/yt/core/tracing/trace_context.h47
-rw-r--r--yt/yt/core/tracing/unittests/allocation_tags_ut.cpp44
-rw-r--r--yt/yt/core/tracing/unittests/ya.make18
-rw-r--r--yt/yt/core/ya.make1
15 files changed, 444 insertions, 53 deletions
diff --git a/yt/yt/core/actions/bind-inl.h b/yt/yt/core/actions/bind-inl.h
index f88d3116d93..48a57a4ebdc 100644
--- a/yt/yt/core/actions/bind-inl.h
+++ b/yt/yt/core/actions/bind-inl.h
@@ -7,6 +7,10 @@
#include <yt/yt/core/concurrency/propagating_storage.h>
+#include <yt/yt/core/tracing/trace_context.h>
+
+#include <library/cpp/yt/memory/memory_tag.h>
+
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
@@ -504,12 +508,27 @@ public:
auto* volatile unoptimizedState = state;
Y_UNUSED(unoptimizedState);
+ TMemoryTag memoryTag = GetCurrentMemoryTag();
+
auto propagatingStorageGuard = state->MakePropagatingStorageGuard();
Y_UNUSED(propagatingStorageGuard);
- return state->Functor(
- NDetail::Unwrap(std::get<BoundIndexes>(state->BoundArgs))...,
- std::forward<TAs>(args)...);
+ if (memoryTag != NullMemoryTag) {
+ auto traceContext = NTracing::GetOrCreateTraceContext("BindMemoryTag");
+
+ // Does NOT finish the trace context upon destruction.
+ NTracing::TCurrentTraceContextGuard contextGuard(traceContext);
+
+ traceContext->SetAllocationTag(NTracing::MemoryTagLiteral, memoryTag);
+
+ return state->Functor(
+ NDetail::Unwrap(std::get<BoundIndexes>(state->BoundArgs))...,
+ std::forward<TAs>(args)...);
+ } else {
+ return state->Functor(
+ NDetail::Unwrap(std::get<BoundIndexes>(state->BoundArgs))...,
+ std::forward<TAs>(args)...);
+ }
}
private:
diff --git a/yt/yt/core/concurrency/action_queue.cpp b/yt/yt/core/concurrency/action_queue.cpp
index fa1f5e4206a..aa42272a75a 100644
--- a/yt/yt/core/concurrency/action_queue.cpp
+++ b/yt/yt/core/concurrency/action_queue.cpp
@@ -15,6 +15,8 @@
#include <yt/yt/core/misc/ring_queue.h>
#include <yt/yt/core/misc/shutdown.h>
+#include <library/cpp/yt/memory/memory_tag.h>
+
#include <util/thread/lfqueue.h>
namespace NYT::NConcurrency {
diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
index 0a1525def6e..32112db71ac 100644
--- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
+++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
@@ -3,20 +3,20 @@
#include "private.h"
#include "fiber.h"
+#include <yt/yt/library/profiling/producer.h>
+
+#include <yt/yt/core/actions/invoker_util.h>
+
#include <yt/yt/core/misc/finally.h>
#include <yt/yt/core/misc/shutdown.h>
#include <yt/yt/core/misc/singleton.h>
-#include <yt/yt/library/profiling/producer.h>
-
-#include <yt/yt/core/actions/invoker_util.h>
+#include <yt/yt/core/tracing/trace_context.h>
#include <library/cpp/yt/memory/memory_tag.h>
#include <library/cpp/yt/threading/fork_aware_spin_lock.h>
-#include <library/cpp/yt/memory/memory_tag.h>
-
#include <util/thread/lfstack.h>
#include <thread>
diff --git a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp
index 291f2b8d49e..cc36a2d99e0 100644
--- a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp
@@ -4,6 +4,7 @@
#include <yt/yt/core/actions/current_invoker.h>
// TODO(lukyan): Move invoker_detail to concurrency? Merge concurrency and actions?
#include <yt/yt/core/actions/invoker_detail.h>
+#include <yt/yt/core/actions/invoker_util.h>
#include <yt/yt/core/concurrency/scheduler.h>
#include <yt/yt/core/concurrency/action_queue.h>
@@ -14,13 +15,9 @@
#include <yt/yt/core/concurrency/two_level_fair_share_thread_pool.h>
#include <yt/yt/core/concurrency/new_fair_share_thread_pool.h>
-#include <yt/yt/core/profiling/timing.h>
-
#include <yt/yt/core/logging/log.h>
-#include <yt/yt/core/actions/cancelable_context.h>
-#include <yt/yt/core/actions/invoker_util.h>
-
+#include <yt/yt/core/misc/finally.h>
#include <yt/yt/core/misc/lazy_ptr.h>
#include <yt/yt/core/misc/proc.h>
@@ -29,8 +26,6 @@
#include <yt/yt/core/tracing/config.h>
#include <yt/yt/core/tracing/trace_context.h>
-#include <yt/yt/core/misc/finally.h>
-
#include <yt/yt/core/ytree/helpers.h>
#include <library/cpp/yt/threading/count_down_latch.h>
diff --git a/yt/yt/core/misc/unittests/memory_tag_ut.cpp b/yt/yt/core/misc/unittests/memory_tag_ut.cpp
index 346cb86e0b1..1c80aea4e07 100644
--- a/yt/yt/core/misc/unittests/memory_tag_ut.cpp
+++ b/yt/yt/core/misc/unittests/memory_tag_ut.cpp
@@ -6,15 +6,17 @@
#include <yt/yt/core/concurrency/thread_pool.h>
#include <yt/yt/core/concurrency/scheduler.h>
+#include <yt/yt/core/misc/lazy_ptr.h>
+
+#include <yt/yt/core/tracing/allocation_tags.h>
+#include <yt/yt/core/tracing/trace_context.h>
+
#include <library/cpp/yt/memory/memory_tag.h>
#include <util/random/random.h>
#include <util/system/compiler.h>
-// These tests do not work under MSAN and ASAN.
-#if !defined(_msan_enabled_) and !defined(_asan_enabled_) and defined(_linux_) and defined(YT_ALLOC_ENABLED)
-
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
@@ -22,13 +24,12 @@ namespace NYT {
// Used for fake side effects to disable compiler optimizations.
volatile const void* FakeSideEffectVolatileVariable = nullptr;
-////////////////////////////////////////////////////////////////////////////////
-
-namespace {
-
using namespace NConcurrency;
using namespace ::testing;
+// These tests do not work under MSAN and ASAN.
+#if !defined(_msan_enabled_) and !defined(_asan_enabled_) and defined(_linux_) and defined(YT_ALLOC_ENABLED)
+
////////////////////////////////////////////////////////////////////////////////
class TMemoryTagTest
@@ -238,7 +239,98 @@ INSTANTIATE_TEST_SUITE_P(MemoryTagTest, TMemoryTagTest, Values(
////////////////////////////////////////////////////////////////////////////////
-} // namespace
-} // namespace NYT
-
#endif // !defined(_msan_enabled_)
+
+////////////////////////////////////////////////////////////////////////////////
+
+using namespace NTracing;
+
+TEST(MemoryTagTest, MemoryTagPropagationViaAllocationTags)
+{
+ auto localContext = CreateTraceContextFromCurrent("MemoryTagPropagation");
+ auto localTag = 1u;
+
+ localContext->SetAllocationTags({
+ {MemoryTagLiteral, ToString(localTag)}
+ });
+
+ auto guard = TCurrentTraceContextGuard(localContext);
+
+ auto actionQueue = New<TActionQueue>();
+
+ auto tag = 2u;
+ auto invoker = CreateMemoryTaggingInvoker(actionQueue->GetInvoker(), tag);
+ auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
+ EXPECT_EQ(currentTag, localTag);
+
+ auto asyncResult = BIND_NO_PROPAGATE([=] {
+ auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
+ EXPECT_EQ(currentTag, tag);
+ })
+ .AsyncVia(invoker)
+ .Run();
+
+ WaitFor(asyncResult)
+ .ThrowOnError();
+
+ currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
+ EXPECT_EQ(currentTag, localTag);
+}
+
+void TestYield(TMemoryTag tag)
+{
+ auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
+ EXPECT_EQ(currentTag, tag);
+
+ Yield();
+ currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
+ EXPECT_EQ(currentTag, tag);
+
+ Yield();
+ currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
+ EXPECT_EQ(currentTag, tag);
+}
+
+TEST(MemoryTagTest, MemoryTagWithYieldContextPropagation)
+{
+ auto localContext = CreateTraceContextFromCurrent("MemoryTagSwitchContextPropagation");
+ auto localTag = 1u;
+
+ localContext->SetAllocationTags({
+ {MemoryTagLiteral, ToString(localTag)}
+ });
+
+ auto guard = TCurrentTraceContextGuard(localContext);
+
+ auto actionQueue = New<TActionQueue>();
+
+ auto tag1 = 222u;
+ auto tag2 = 333u;
+
+ auto invoker1 = CreateMemoryTaggingInvoker(actionQueue->GetInvoker(), tag1);
+ auto invoker2 = CreateMemoryTaggingInvoker(actionQueue->GetInvoker(), tag2);
+ auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
+ EXPECT_EQ(currentTag, localTag);
+
+ // Use BIND_NO_PROPAGATE in order not overwrite tags in localContext.
+
+ auto asyncResult1 = BIND_NO_PROPAGATE(TestYield)
+ .AsyncVia(invoker1)
+ .Run(tag1);
+
+ auto asyncResult2 = BIND_NO_PROPAGATE(TestYield)
+ .AsyncVia(invoker2)
+ .Run(tag2);
+
+ WaitFor(asyncResult1)
+ .ThrowOnError();
+ WaitFor(asyncResult2)
+ .ThrowOnError();
+
+ currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral);
+ EXPECT_EQ(currentTag, localTag);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/yt/core/tracing/allocation_hooks.cpp b/yt/yt/core/tracing/allocation_hooks.cpp
index 873e410c15b..0a10e786165 100644
--- a/yt/yt/core/tracing/allocation_hooks.cpp
+++ b/yt/yt/core/tracing/allocation_hooks.cpp
@@ -1,6 +1,4 @@
#include "allocation_tags.h"
-
-#include "allocation_tags.h"
#include "trace_context.h"
#include <library/cpp/yt/memory/leaky_singleton.h>
@@ -21,8 +19,11 @@ void* CreateAllocationTagsData()
if (!traceContext) {
return nullptr;
}
- auto allocationTagsPtr = traceContext->GetAllocationTags();
- return static_cast<void*>(allocationTagsPtr.Release());
+
+ // Need to avoid deadlock from TTraceContext->SetAllocationTags due another allocation.
+ auto allocationTags = traceContext->GetAllocationTagsPtr();
+
+ return static_cast<void*>(allocationTags.Release());
}
void* CopyAllocationTagsData(void* ptr)
@@ -37,20 +38,27 @@ void* CopyAllocationTagsData(void* ptr)
void DestroyAllocationTagsData(void* ptr)
{
auto* allocationTagsPtr = static_cast<TAllocationTags*>(ptr);
- // NB. No need to check for nullptr here, because ScheduleFree already does that
+ // NB. No need to check for nullptr here, because ScheduleFree already does that.
FreeList->ScheduleFree(allocationTagsPtr);
}
-const std::vector<std::pair<TString, TString>>& ReadAllocationTagsData(void* ptr)
+const TAllocationTags::TTags& ReadAllocationTagsData(void* ptr)
{
auto* allocationTagsPtr = static_cast<TAllocationTags*>(ptr);
if (!allocationTagsPtr) {
- static std::vector<std::pair<TString, TString>> emptyTags;
+ static TAllocationTags::TTags emptyTags;
return emptyTags;
}
return allocationTagsPtr->GetTags();
}
+std::optional<TString> FindTagValue(
+ const TAllocationTags::TTags& tags,
+ const TString& key)
+{
+ return TAllocationTags::FindTagValue(tags, key);
+}
+
void StartAllocationTagsCleanupThread(TDuration cleanupInterval)
{
std::thread backgroundThread([cleanupInterval] {
diff --git a/yt/yt/core/tracing/allocation_tags.cpp b/yt/yt/core/tracing/allocation_tags.cpp
index 1c041cd9fc6..66177fdedbf 100644
--- a/yt/yt/core/tracing/allocation_tags.cpp
+++ b/yt/yt/core/tracing/allocation_tags.cpp
@@ -8,11 +8,36 @@ TAllocationTags::TAllocationTags(std::vector<std::pair<TString, TString>> tags)
: Tags_(std::move(tags))
{ }
-const TAllocationTags::TTags& TAllocationTags::GetTags() const
+const TAllocationTags::TTags& TAllocationTags::GetTags() const noexcept
{
return Tags_;
}
+std::optional<TAllocationTags::TValue> TAllocationTags::FindTagValue(const TKey& key) const
+{
+ return FindTagValue(Tags_, key);
+}
+
+std::optional<TAllocationTags::TValue> TAllocationTags::FindTagValue(
+ const TTags& tags,
+ const TKey& key)
+{
+ std::optional<TAllocationTags::TValue> value;
+
+ auto memoryTagIterator = std::find_if(
+ tags.cbegin(),
+ tags.cend(),
+ [&] (const auto& pair) {
+ return pair.first == key;
+ });
+
+ if (memoryTagIterator != tags.cend()) {
+ value = memoryTagIterator->second;
+ }
+
+ return value;
+}
+
TAllocationTagsFreeList::~TAllocationTagsFreeList()
{
Cleanup();
diff --git a/yt/yt/core/tracing/allocation_tags.h b/yt/yt/core/tracing/allocation_tags.h
index 1b6be263f56..549bdc30bc0 100644
--- a/yt/yt/core/tracing/allocation_tags.h
+++ b/yt/yt/core/tracing/allocation_tags.h
@@ -11,11 +11,19 @@ namespace NYT::NTracing {
class TAllocationTags : public TRefCounted
{
public:
- using TTags = std::vector<std::pair<TString, TString>>;
+ using TKey = TString;
+ using TValue = TString;
+ using TTags = std::vector<std::pair<TKey, TValue>>;
explicit TAllocationTags(TTags tags);
- const TTags& GetTags() const;
+ const TTags& GetTags() const noexcept;
+
+ std::optional<TValue> FindTagValue(const TKey& key) const;
+
+ static std::optional<TValue> FindTagValue(
+ const TTags& tags,
+ const TKey& key);
private:
friend class TAllocationTagsFreeList;
diff --git a/yt/yt/core/tracing/public.h b/yt/yt/core/tracing/public.h
index f6e8d090b55..abcb82b0b18 100644
--- a/yt/yt/core/tracing/public.h
+++ b/yt/yt/core/tracing/public.h
@@ -8,6 +8,10 @@ namespace NYT::NTracing {
////////////////////////////////////////////////////////////////////////////////
+constexpr auto MemoryTagLiteral = "memory_tag";
+
+////////////////////////////////////////////////////////////////////////////////
+
namespace NProto {
class TTracingExt;
diff --git a/yt/yt/core/tracing/trace_context-inl.h b/yt/yt/core/tracing/trace_context-inl.h
index 0aa73aadcaf..2466564904d 100644
--- a/yt/yt/core/tracing/trace_context-inl.h
+++ b/yt/yt/core/tracing/trace_context-inl.h
@@ -4,6 +4,10 @@
#include "trace_context.h"
#endif
+#include "allocation_tags.h"
+
+#include <yt/yt/core/concurrency/thread_affinity.h>
+
#include <atomic>
namespace NYT::NTracing {
@@ -77,6 +81,100 @@ void TTraceContext::AddTag(const TString& tagName, const T& tagValue)
AddTag(tagName, ToString(tagValue));
}
+template <typename TTag>
+std::optional<TTag> TTraceContext::DoFindAllocationTag(const TString& key) const
+{
+ VERIFY_SPINLOCK_AFFINITY(AllocationTagsRWLock_);
+
+ TAllocationTagsPtr tags = nullptr;
+
+ {
+ // Local guard for copy RefCounted AllocationTags_.
+ auto guard = Guard(AllocationTagsAsRefCountedSpinlock_);
+ tags = AllocationTags_;
+ }
+
+ if (tags) {
+ auto valueOpt = tags->FindTagValue(key);
+
+ if (valueOpt.has_value()) {
+ return FromString<TTag>(valueOpt.value());
+ }
+ }
+
+ return std::nullopt;
+}
+
+template <typename TTag>
+std::optional<TTag> TTraceContext::FindAllocationTag(const TString& key) const
+{
+ auto readerGuard = ReaderGuard(AllocationTagsRWLock_);
+ return DoFindAllocationTag<TTag>(key);
+}
+
+template <typename TTag>
+std::optional<TTag> TTraceContext::RemoveAllocationTag(const TString& key)
+{
+ auto writerGuard = NThreading::WriterGuard(AllocationTagsRWLock_);
+ auto newTags = DoGetAllocationTags();
+
+ auto foundTagIt = std::remove_if(
+ newTags.begin(),
+ newTags.end(),
+ [&key] (const auto& pair) {
+ return pair.first == key;
+ });
+
+ std::optional<TTag> oldTag;
+
+ if (foundTagIt != newTags.end()) {
+ oldTag = FromString<TTag>(foundTagIt->second);
+ }
+
+ newTags.erase(foundTagIt, newTags.end());
+
+ DoSetAllocationTags(std::move(newTags));
+
+ return oldTag;
+}
+
+template <typename TTag>
+std::optional<TTag> TTraceContext::SetAllocationTag(const TString& key, TTag newTag)
+{
+ auto newTagString = ToString(newTag);
+
+ auto writerGuard = NThreading::WriterGuard(AllocationTagsRWLock_);
+ auto newTags = DoGetAllocationTags();
+
+ if (!newTags.empty()) {
+ std::optional<TString> oldTag;
+
+ auto tagIt = std::find_if(
+ newTags.begin(),
+ newTags.end(),
+ [&key] (const auto& pair) {
+ return pair.first == key;
+ });
+
+ if (tagIt != newTags.end()) {
+ oldTag = std::move(tagIt->second);
+ tagIt->second = std::move(newTagString);
+ } else {
+ newTags.emplace_back(key, std::move(newTagString));
+ }
+
+ DoSetAllocationTags(std::move(newTags));
+
+ if (oldTag.has_value()) {
+ return FromString<TTag>(oldTag.value());
+ }
+ } else {
+ DoSetAllocationTags({{key, std::move(newTagString)}});
+ }
+
+ return std::nullopt;
+}
+
////////////////////////////////////////////////////////////////////////////////
namespace NDetail {
@@ -222,6 +320,12 @@ Y_FORCE_INLINE TTraceContextPtr CreateTraceContextFromCurrent(TString spanName)
return context ? context->CreateChild(std::move(spanName)) : TTraceContext::NewRoot(std::move(spanName));
}
+Y_FORCE_INLINE TTraceContextPtr GetOrCreateTraceContext(TString spanNameIfCreate)
+{
+ auto* context = TryGetCurrentTraceContext();
+ return context ? context : TTraceContext::NewRoot(std::move(spanNameIfCreate));
+}
+
////////////////////////////////////////////////////////////////////////////////
template <class TFn>
diff --git a/yt/yt/core/tracing/trace_context.cpp b/yt/yt/core/tracing/trace_context.cpp
index 152b01627ed..d813e84db4b 100644
--- a/yt/yt/core/tracing/trace_context.cpp
+++ b/yt/yt/core/tracing/trace_context.cpp
@@ -264,22 +264,66 @@ void TTraceContext::SetLoggingTag(const TString& loggingTag)
LoggingTag_ = loggingTag;
}
-void TTraceContext::SetAllocationTags(TAllocationTagsPtr tags)
+void TTraceContext::ClearAllocationTagsPtr() noexcept
{
- AllocationTags_ = std::move(tags);
+ auto writerGuard = WriterGuard(AllocationTagsRWLock_);
+ auto guard = Guard(AllocationTagsAsRefCountedSpinlock_);
+ AllocationTags_ = nullptr;
}
-TAllocationTagsPtr TTraceContext::GetAllocationTags() const
+TAllocationTags::TTags TTraceContext::DoGetAllocationTags() const
{
- return AllocationTags_;
+ VERIFY_SPINLOCK_AFFINITY(AllocationTagsRWLock_);
+
+ TAllocationTagsPtr tags = nullptr;
+
+ {
+ // Local guard for copy RefCounted AllocationTags_.
+ auto guard = Guard(AllocationTagsAsRefCountedSpinlock_);
+ tags = AllocationTags_;
+ }
+
+ if (tags != nullptr) {
+ return tags->GetTags();
+ }
+
+ return {};
+}
+
+TAllocationTags::TTags TTraceContext::GetAllocationTags() const
+{
+ auto readerGuard = ReaderGuard(AllocationTagsRWLock_);
+ return DoGetAllocationTags();
}
-std::vector<std::pair<TString, TString>> TTraceContext::ExtractAllocationTags() const
+TAllocationTagsPtr TTraceContext::GetAllocationTagsPtr() const noexcept
{
- if (AllocationTags_ != nullptr) {
- return AllocationTags_->GetTags();
+ // Local guard for copy RefCounted AllocationTags_ for allocator callback CreateAllocationTagsData().
+ auto guard = Guard(AllocationTagsAsRefCountedSpinlock_);
+
+ auto copy = AllocationTags_;
+ return copy;
+}
+
+void TTraceContext::DoSetAllocationTags(TAllocationTags::TTags&& tags)
+{
+ VERIFY_SPINLOCK_AFFINITY(AllocationTagsRWLock_);
+
+ TAllocationTagsPtr allocationTagsPtr = nullptr;
+ if (!tags.empty()) {
+ // Allocation MUST be done BEFORE Guard(AllocationTagsAsRefCountedSpinlock_) to avoid deadlock with CreateAllocationTagsData().
+ allocationTagsPtr = New<TAllocationTags>(std::move(tags));
}
- return {};
+
+ auto guard = Guard(AllocationTagsAsRefCountedSpinlock_);
+ AllocationTags_ = allocationTagsPtr;
+}
+
+void TTraceContext::SetAllocationTags(TAllocationTags::TTags&& tags)
+{
+ auto writerGuard = WriterGuard(AllocationTagsRWLock_);
+
+ return DoSetAllocationTags(std::move(tags));
}
void TTraceContext::SetRecorded()
diff --git a/yt/yt/core/tracing/trace_context.h b/yt/yt/core/tracing/trace_context.h
index a6123b10bcc..7276a802f5c 100644
--- a/yt/yt/core/tracing/trace_context.h
+++ b/yt/yt/core/tracing/trace_context.h
@@ -1,7 +1,11 @@
#pragma once
+#include "allocation_tags.h"
+#include "library/cpp/yt/threading/rw_spin_lock.h"
#include "public.h"
+#include <yt/yt/library/tracing/public.h>
+
#include <yt/yt/core/misc/guid.h>
#include <yt/yt/core/profiling/public.h>
@@ -10,8 +14,6 @@
#include <yt/yt/core/concurrency/public.h>
-#include <yt/yt/library/tracing/public.h>
-
#include <library/cpp/yt/threading/spin_lock.h>
#include <atomic>
@@ -119,13 +121,24 @@ public:
void SetRequestId(TRequestId requestId);
TRequestId GetRequestId() const;
- //! Sets allocation tags.
- /*!
- * Not thread-safe.
- */
- void SetAllocationTags(TAllocationTagsPtr tags);
- TAllocationTagsPtr GetAllocationTags() const;
- std::vector<std::pair<TString, TString>> ExtractAllocationTags() const;
+ void SetAllocationTags(TAllocationTags::TTags&& tags);
+
+ TAllocationTags::TTags GetAllocationTags() const;
+
+ TAllocationTagsPtr GetAllocationTagsPtr() const noexcept;
+
+ void ClearAllocationTagsPtr() noexcept;
+
+ template <typename TTag>
+ std::optional<TTag> FindAllocationTag(const TString& key) const;
+
+ template <typename TTag>
+ std::optional<TTag> SetAllocationTag(
+ const TString& key,
+ TTag value);
+
+ template <typename TTag>
+ std::optional<TTag> RemoveAllocationTag(const TString& key);
//! Sets logging tag.
/*!
@@ -229,6 +242,10 @@ private:
NYson::TYsonString Baggage_;
std::vector<std::pair<TString, std::variant<TString, i64>>> ProfilingTags_;
+
+ // Must NOT allocate memory on the heap in callbacks with usage of AllocationTags_ to avoid deadlock with allocator.
+ YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, AllocationTagsRWLock_);
+ YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, AllocationTagsAsRefCountedSpinlock_);
TAllocationTagsPtr AllocationTags_;
TTraceContext(
@@ -238,6 +255,17 @@ private:
DECLARE_NEW_FRIEND()
void SetDuration();
+
+ void DoSetAllocationTags(TAllocationTags::TTags&& tags);
+
+ template <typename TTag>
+ std::optional<TTag> DoSetAllocationTag(const TString& key, TTag newTag);
+
+ TAllocationTags::TTags DoGetAllocationTags() const;
+
+ template <typename TTag>
+ std::optional<TTag> DoFindAllocationTag(const TString& key) const;
+
};
DEFINE_REFCOUNTED_TYPE(TTraceContext)
@@ -391,7 +419,6 @@ private:
////////////////////////////////////////////////////////////////////////////////
-
} // namespace NYT::NTracing
#define TRACE_CONTEXT_INL_H_
diff --git a/yt/yt/core/tracing/unittests/allocation_tags_ut.cpp b/yt/yt/core/tracing/unittests/allocation_tags_ut.cpp
new file mode 100644
index 00000000000..8f7e79d15b5
--- /dev/null
+++ b/yt/yt/core/tracing/unittests/allocation_tags_ut.cpp
@@ -0,0 +1,44 @@
+#include <yt/yt/core/test_framework/framework.h>
+
+#include <yt/yt/core/tracing/allocation_tags.h>
+#include <yt/yt/core/tracing/trace_context.h>
+
+namespace NYT::NTracing {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TestAllocationTags, GetSetAllocationTags)
+{
+ auto traceContext = TTraceContext::NewRoot("Root");
+ TTraceContextGuard guard(traceContext);
+
+ ASSERT_EQ(traceContext->FindAllocationTag<TString>("a"), std::nullopt);
+
+ traceContext->SetAllocationTags({{"user", "first"}, {"sometag", "my"}});
+ ASSERT_EQ(traceContext->FindAllocationTag<TMemoryTag>(MemoryTagLiteral), std::nullopt);
+ ASSERT_EQ(traceContext->FindAllocationTag<TString>("user"), "first");
+ ASSERT_EQ(traceContext->FindAllocationTag<TString>("sometag"), "my");
+ ASSERT_EQ(traceContext->FindAllocationTag<TString>("other"), std::nullopt);
+ ASSERT_EQ(traceContext->FindAllocationTag<int>("other"), std::nullopt);
+
+ traceContext->SetAllocationTag<TString>("a", "e");
+
+ ASSERT_EQ(traceContext->FindAllocationTag<TString>("a"), "e");
+
+ traceContext->RemoveAllocationTag<TString>("a");
+ ASSERT_EQ(traceContext->FindAllocationTag<TString>("a"), std::nullopt);
+
+ traceContext->RemoveAllocationTag<TString>("user");
+ traceContext->RemoveAllocationTag<TString>("sometag");
+ ASSERT_EQ(traceContext->FindAllocationTag<TString>("user"), std::nullopt);
+ ASSERT_EQ(traceContext->FindAllocationTag<TString>("sometag"), std::nullopt);
+ ASSERT_TRUE(traceContext->GetAllocationTags().empty());
+
+ traceContext->SetAllocationTag<TMemoryTag>(MemoryTagLiteral, TMemoryTag{1});
+ ASSERT_EQ(traceContext->FindAllocationTag<TMemoryTag>(MemoryTagLiteral), TMemoryTag{1});
+ ASSERT_FALSE(traceContext->GetAllocationTags().empty());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NTracing
diff --git a/yt/yt/core/tracing/unittests/ya.make b/yt/yt/core/tracing/unittests/ya.make
new file mode 100644
index 00000000000..1b64215672d
--- /dev/null
+++ b/yt/yt/core/tracing/unittests/ya.make
@@ -0,0 +1,18 @@
+GTEST(unittester-core-tracing)
+
+INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
+
+SRCS(
+ allocation_tags_ut.cpp
+)
+
+INCLUDE(${ARCADIA_ROOT}/yt/opensource_tests.inc)
+
+PEERDIR(
+ yt/yt/core
+ yt/yt/core/test_framework
+)
+
+SIZE(SMALL)
+
+END()
diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make
index 884956c8304..6478c3f2ff4 100644
--- a/yt/yt/core/ya.make
+++ b/yt/yt/core/ya.make
@@ -367,6 +367,7 @@ RECURSE_FOR_TESTS(
http/unittests
misc/unittests
net/unittests
+ tracing/unittests
yson/unittests
http/mock
net/mock