diff options
author | ni-stoiko <[email protected]> | 2023-08-16 18:28:27 +0300 |
---|---|---|
committer | ni-stoiko <[email protected]> | 2023-08-16 20:26:40 +0300 |
commit | c354caae81dae5f18cd82554ef6746a347907e8f (patch) | |
tree | c2c63fc4465da3f9a2dea231f5006bec2812c6dc | |
parent | a5f07dbb2c2d83106f82c3eb29d1dd4dabaeaabb (diff) |
YT-19555: Using Allocation tags for access MemoryTag
Applied changes for allocation_tags
Changes are applied for heap_profiler
-rw-r--r-- | yt/yt/core/actions/bind-inl.h | 25 | ||||
-rw-r--r-- | yt/yt/core/concurrency/action_queue.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fiber_scheduler_thread.cpp | 10 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/scheduler_ut.cpp | 9 | ||||
-rw-r--r-- | yt/yt/core/misc/unittests/memory_tag_ut.cpp | 112 | ||||
-rw-r--r-- | yt/yt/core/tracing/allocation_hooks.cpp | 22 | ||||
-rw-r--r-- | yt/yt/core/tracing/allocation_tags.cpp | 27 | ||||
-rw-r--r-- | yt/yt/core/tracing/allocation_tags.h | 12 | ||||
-rw-r--r-- | yt/yt/core/tracing/public.h | 4 | ||||
-rw-r--r-- | yt/yt/core/tracing/trace_context-inl.h | 104 | ||||
-rw-r--r-- | yt/yt/core/tracing/trace_context.cpp | 60 | ||||
-rw-r--r-- | yt/yt/core/tracing/trace_context.h | 47 | ||||
-rw-r--r-- | yt/yt/core/tracing/unittests/allocation_tags_ut.cpp | 44 | ||||
-rw-r--r-- | yt/yt/core/tracing/unittests/ya.make | 18 | ||||
-rw-r--r-- | yt/yt/core/ya.make | 1 |
15 files changed, 444 insertions, 53 deletions
diff --git a/yt/yt/core/actions/bind-inl.h b/yt/yt/core/actions/bind-inl.h index f88d3116d93..48a57a4ebdc 100644 --- a/yt/yt/core/actions/bind-inl.h +++ b/yt/yt/core/actions/bind-inl.h @@ -7,6 +7,10 @@ #include <yt/yt/core/concurrency/propagating_storage.h> +#include <yt/yt/core/tracing/trace_context.h> + +#include <library/cpp/yt/memory/memory_tag.h> + namespace NYT { //////////////////////////////////////////////////////////////////////////////// @@ -504,12 +508,27 @@ public: auto* volatile unoptimizedState = state; Y_UNUSED(unoptimizedState); + TMemoryTag memoryTag = GetCurrentMemoryTag(); + auto propagatingStorageGuard = state->MakePropagatingStorageGuard(); Y_UNUSED(propagatingStorageGuard); - return state->Functor( - NDetail::Unwrap(std::get<BoundIndexes>(state->BoundArgs))..., - std::forward<TAs>(args)...); + if (memoryTag != NullMemoryTag) { + auto traceContext = NTracing::GetOrCreateTraceContext("BindMemoryTag"); + + // Does NOT finish the trace context upon destruction. + NTracing::TCurrentTraceContextGuard contextGuard(traceContext); + + traceContext->SetAllocationTag(NTracing::MemoryTagLiteral, memoryTag); + + return state->Functor( + NDetail::Unwrap(std::get<BoundIndexes>(state->BoundArgs))..., + std::forward<TAs>(args)...); + } else { + return state->Functor( + NDetail::Unwrap(std::get<BoundIndexes>(state->BoundArgs))..., + std::forward<TAs>(args)...); + } } private: diff --git a/yt/yt/core/concurrency/action_queue.cpp b/yt/yt/core/concurrency/action_queue.cpp index fa1f5e4206a..aa42272a75a 100644 --- a/yt/yt/core/concurrency/action_queue.cpp +++ b/yt/yt/core/concurrency/action_queue.cpp @@ -15,6 +15,8 @@ #include <yt/yt/core/misc/ring_queue.h> #include <yt/yt/core/misc/shutdown.h> +#include <library/cpp/yt/memory/memory_tag.h> + #include <util/thread/lfqueue.h> namespace NYT::NConcurrency { diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp index 0a1525def6e..32112db71ac 100644 --- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp @@ -3,20 +3,20 @@ #include "private.h" #include "fiber.h" +#include <yt/yt/library/profiling/producer.h> + +#include <yt/yt/core/actions/invoker_util.h> + #include <yt/yt/core/misc/finally.h> #include <yt/yt/core/misc/shutdown.h> #include <yt/yt/core/misc/singleton.h> -#include <yt/yt/library/profiling/producer.h> - -#include <yt/yt/core/actions/invoker_util.h> +#include <yt/yt/core/tracing/trace_context.h> #include <library/cpp/yt/memory/memory_tag.h> #include <library/cpp/yt/threading/fork_aware_spin_lock.h> -#include <library/cpp/yt/memory/memory_tag.h> - #include <util/thread/lfstack.h> #include <thread> diff --git a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp index 291f2b8d49e..cc36a2d99e0 100644 --- a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp +++ b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp @@ -4,6 +4,7 @@ #include <yt/yt/core/actions/current_invoker.h> // TODO(lukyan): Move invoker_detail to concurrency? Merge concurrency and actions? #include <yt/yt/core/actions/invoker_detail.h> +#include <yt/yt/core/actions/invoker_util.h> #include <yt/yt/core/concurrency/scheduler.h> #include <yt/yt/core/concurrency/action_queue.h> @@ -14,13 +15,9 @@ #include <yt/yt/core/concurrency/two_level_fair_share_thread_pool.h> #include <yt/yt/core/concurrency/new_fair_share_thread_pool.h> -#include <yt/yt/core/profiling/timing.h> - #include <yt/yt/core/logging/log.h> -#include <yt/yt/core/actions/cancelable_context.h> -#include <yt/yt/core/actions/invoker_util.h> - +#include <yt/yt/core/misc/finally.h> #include <yt/yt/core/misc/lazy_ptr.h> #include <yt/yt/core/misc/proc.h> @@ -29,8 +26,6 @@ #include <yt/yt/core/tracing/config.h> #include <yt/yt/core/tracing/trace_context.h> -#include <yt/yt/core/misc/finally.h> - #include <yt/yt/core/ytree/helpers.h> #include <library/cpp/yt/threading/count_down_latch.h> diff --git a/yt/yt/core/misc/unittests/memory_tag_ut.cpp b/yt/yt/core/misc/unittests/memory_tag_ut.cpp index 346cb86e0b1..1c80aea4e07 100644 --- a/yt/yt/core/misc/unittests/memory_tag_ut.cpp +++ b/yt/yt/core/misc/unittests/memory_tag_ut.cpp @@ -6,15 +6,17 @@ #include <yt/yt/core/concurrency/thread_pool.h> #include <yt/yt/core/concurrency/scheduler.h> +#include <yt/yt/core/misc/lazy_ptr.h> + +#include <yt/yt/core/tracing/allocation_tags.h> +#include <yt/yt/core/tracing/trace_context.h> + #include <library/cpp/yt/memory/memory_tag.h> #include <util/random/random.h> #include <util/system/compiler.h> -// These tests do not work under MSAN and ASAN. -#if !defined(_msan_enabled_) and !defined(_asan_enabled_) and defined(_linux_) and defined(YT_ALLOC_ENABLED) - namespace NYT { //////////////////////////////////////////////////////////////////////////////// @@ -22,13 +24,12 @@ namespace NYT { // Used for fake side effects to disable compiler optimizations. volatile const void* FakeSideEffectVolatileVariable = nullptr; -//////////////////////////////////////////////////////////////////////////////// - -namespace { - using namespace NConcurrency; using namespace ::testing; +// These tests do not work under MSAN and ASAN. +#if !defined(_msan_enabled_) and !defined(_asan_enabled_) and defined(_linux_) and defined(YT_ALLOC_ENABLED) + //////////////////////////////////////////////////////////////////////////////// class TMemoryTagTest @@ -238,7 +239,98 @@ INSTANTIATE_TEST_SUITE_P(MemoryTagTest, TMemoryTagTest, Values( //////////////////////////////////////////////////////////////////////////////// -} // namespace -} // namespace NYT - #endif // !defined(_msan_enabled_) + +//////////////////////////////////////////////////////////////////////////////// + +using namespace NTracing; + +TEST(MemoryTagTest, MemoryTagPropagationViaAllocationTags) +{ + auto localContext = CreateTraceContextFromCurrent("MemoryTagPropagation"); + auto localTag = 1u; + + localContext->SetAllocationTags({ + {MemoryTagLiteral, ToString(localTag)} + }); + + auto guard = TCurrentTraceContextGuard(localContext); + + auto actionQueue = New<TActionQueue>(); + + auto tag = 2u; + auto invoker = CreateMemoryTaggingInvoker(actionQueue->GetInvoker(), tag); + auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); + EXPECT_EQ(currentTag, localTag); + + auto asyncResult = BIND_NO_PROPAGATE([=] { + auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); + EXPECT_EQ(currentTag, tag); + }) + .AsyncVia(invoker) + .Run(); + + WaitFor(asyncResult) + .ThrowOnError(); + + currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); + EXPECT_EQ(currentTag, localTag); +} + +void TestYield(TMemoryTag tag) +{ + auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); + EXPECT_EQ(currentTag, tag); + + Yield(); + currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); + EXPECT_EQ(currentTag, tag); + + Yield(); + currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); + EXPECT_EQ(currentTag, tag); +} + +TEST(MemoryTagTest, MemoryTagWithYieldContextPropagation) +{ + auto localContext = CreateTraceContextFromCurrent("MemoryTagSwitchContextPropagation"); + auto localTag = 1u; + + localContext->SetAllocationTags({ + {MemoryTagLiteral, ToString(localTag)} + }); + + auto guard = TCurrentTraceContextGuard(localContext); + + auto actionQueue = New<TActionQueue>(); + + auto tag1 = 222u; + auto tag2 = 333u; + + auto invoker1 = CreateMemoryTaggingInvoker(actionQueue->GetInvoker(), tag1); + auto invoker2 = CreateMemoryTaggingInvoker(actionQueue->GetInvoker(), tag2); + auto currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); + EXPECT_EQ(currentTag, localTag); + + // Use BIND_NO_PROPAGATE in order not overwrite tags in localContext. + + auto asyncResult1 = BIND_NO_PROPAGATE(TestYield) + .AsyncVia(invoker1) + .Run(tag1); + + auto asyncResult2 = BIND_NO_PROPAGATE(TestYield) + .AsyncVia(invoker2) + .Run(tag2); + + WaitFor(asyncResult1) + .ThrowOnError(); + WaitFor(asyncResult2) + .ThrowOnError(); + + currentTag = TryGetCurrentTraceContext()->FindAllocationTag<TMemoryTag>(MemoryTagLiteral); + EXPECT_EQ(currentTag, localTag); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/yt/core/tracing/allocation_hooks.cpp b/yt/yt/core/tracing/allocation_hooks.cpp index 873e410c15b..0a10e786165 100644 --- a/yt/yt/core/tracing/allocation_hooks.cpp +++ b/yt/yt/core/tracing/allocation_hooks.cpp @@ -1,6 +1,4 @@ #include "allocation_tags.h" - -#include "allocation_tags.h" #include "trace_context.h" #include <library/cpp/yt/memory/leaky_singleton.h> @@ -21,8 +19,11 @@ void* CreateAllocationTagsData() if (!traceContext) { return nullptr; } - auto allocationTagsPtr = traceContext->GetAllocationTags(); - return static_cast<void*>(allocationTagsPtr.Release()); + + // Need to avoid deadlock from TTraceContext->SetAllocationTags due another allocation. + auto allocationTags = traceContext->GetAllocationTagsPtr(); + + return static_cast<void*>(allocationTags.Release()); } void* CopyAllocationTagsData(void* ptr) @@ -37,20 +38,27 @@ void* CopyAllocationTagsData(void* ptr) void DestroyAllocationTagsData(void* ptr) { auto* allocationTagsPtr = static_cast<TAllocationTags*>(ptr); - // NB. No need to check for nullptr here, because ScheduleFree already does that + // NB. No need to check for nullptr here, because ScheduleFree already does that. FreeList->ScheduleFree(allocationTagsPtr); } -const std::vector<std::pair<TString, TString>>& ReadAllocationTagsData(void* ptr) +const TAllocationTags::TTags& ReadAllocationTagsData(void* ptr) { auto* allocationTagsPtr = static_cast<TAllocationTags*>(ptr); if (!allocationTagsPtr) { - static std::vector<std::pair<TString, TString>> emptyTags; + static TAllocationTags::TTags emptyTags; return emptyTags; } return allocationTagsPtr->GetTags(); } +std::optional<TString> FindTagValue( + const TAllocationTags::TTags& tags, + const TString& key) +{ + return TAllocationTags::FindTagValue(tags, key); +} + void StartAllocationTagsCleanupThread(TDuration cleanupInterval) { std::thread backgroundThread([cleanupInterval] { diff --git a/yt/yt/core/tracing/allocation_tags.cpp b/yt/yt/core/tracing/allocation_tags.cpp index 1c041cd9fc6..66177fdedbf 100644 --- a/yt/yt/core/tracing/allocation_tags.cpp +++ b/yt/yt/core/tracing/allocation_tags.cpp @@ -8,11 +8,36 @@ TAllocationTags::TAllocationTags(std::vector<std::pair<TString, TString>> tags) : Tags_(std::move(tags)) { } -const TAllocationTags::TTags& TAllocationTags::GetTags() const +const TAllocationTags::TTags& TAllocationTags::GetTags() const noexcept { return Tags_; } +std::optional<TAllocationTags::TValue> TAllocationTags::FindTagValue(const TKey& key) const +{ + return FindTagValue(Tags_, key); +} + +std::optional<TAllocationTags::TValue> TAllocationTags::FindTagValue( + const TTags& tags, + const TKey& key) +{ + std::optional<TAllocationTags::TValue> value; + + auto memoryTagIterator = std::find_if( + tags.cbegin(), + tags.cend(), + [&] (const auto& pair) { + return pair.first == key; + }); + + if (memoryTagIterator != tags.cend()) { + value = memoryTagIterator->second; + } + + return value; +} + TAllocationTagsFreeList::~TAllocationTagsFreeList() { Cleanup(); diff --git a/yt/yt/core/tracing/allocation_tags.h b/yt/yt/core/tracing/allocation_tags.h index 1b6be263f56..549bdc30bc0 100644 --- a/yt/yt/core/tracing/allocation_tags.h +++ b/yt/yt/core/tracing/allocation_tags.h @@ -11,11 +11,19 @@ namespace NYT::NTracing { class TAllocationTags : public TRefCounted { public: - using TTags = std::vector<std::pair<TString, TString>>; + using TKey = TString; + using TValue = TString; + using TTags = std::vector<std::pair<TKey, TValue>>; explicit TAllocationTags(TTags tags); - const TTags& GetTags() const; + const TTags& GetTags() const noexcept; + + std::optional<TValue> FindTagValue(const TKey& key) const; + + static std::optional<TValue> FindTagValue( + const TTags& tags, + const TKey& key); private: friend class TAllocationTagsFreeList; diff --git a/yt/yt/core/tracing/public.h b/yt/yt/core/tracing/public.h index f6e8d090b55..abcb82b0b18 100644 --- a/yt/yt/core/tracing/public.h +++ b/yt/yt/core/tracing/public.h @@ -8,6 +8,10 @@ namespace NYT::NTracing { //////////////////////////////////////////////////////////////////////////////// +constexpr auto MemoryTagLiteral = "memory_tag"; + +//////////////////////////////////////////////////////////////////////////////// + namespace NProto { class TTracingExt; diff --git a/yt/yt/core/tracing/trace_context-inl.h b/yt/yt/core/tracing/trace_context-inl.h index 0aa73aadcaf..2466564904d 100644 --- a/yt/yt/core/tracing/trace_context-inl.h +++ b/yt/yt/core/tracing/trace_context-inl.h @@ -4,6 +4,10 @@ #include "trace_context.h" #endif +#include "allocation_tags.h" + +#include <yt/yt/core/concurrency/thread_affinity.h> + #include <atomic> namespace NYT::NTracing { @@ -77,6 +81,100 @@ void TTraceContext::AddTag(const TString& tagName, const T& tagValue) AddTag(tagName, ToString(tagValue)); } +template <typename TTag> +std::optional<TTag> TTraceContext::DoFindAllocationTag(const TString& key) const +{ + VERIFY_SPINLOCK_AFFINITY(AllocationTagsRWLock_); + + TAllocationTagsPtr tags = nullptr; + + { + // Local guard for copy RefCounted AllocationTags_. + auto guard = Guard(AllocationTagsAsRefCountedSpinlock_); + tags = AllocationTags_; + } + + if (tags) { + auto valueOpt = tags->FindTagValue(key); + + if (valueOpt.has_value()) { + return FromString<TTag>(valueOpt.value()); + } + } + + return std::nullopt; +} + +template <typename TTag> +std::optional<TTag> TTraceContext::FindAllocationTag(const TString& key) const +{ + auto readerGuard = ReaderGuard(AllocationTagsRWLock_); + return DoFindAllocationTag<TTag>(key); +} + +template <typename TTag> +std::optional<TTag> TTraceContext::RemoveAllocationTag(const TString& key) +{ + auto writerGuard = NThreading::WriterGuard(AllocationTagsRWLock_); + auto newTags = DoGetAllocationTags(); + + auto foundTagIt = std::remove_if( + newTags.begin(), + newTags.end(), + [&key] (const auto& pair) { + return pair.first == key; + }); + + std::optional<TTag> oldTag; + + if (foundTagIt != newTags.end()) { + oldTag = FromString<TTag>(foundTagIt->second); + } + + newTags.erase(foundTagIt, newTags.end()); + + DoSetAllocationTags(std::move(newTags)); + + return oldTag; +} + +template <typename TTag> +std::optional<TTag> TTraceContext::SetAllocationTag(const TString& key, TTag newTag) +{ + auto newTagString = ToString(newTag); + + auto writerGuard = NThreading::WriterGuard(AllocationTagsRWLock_); + auto newTags = DoGetAllocationTags(); + + if (!newTags.empty()) { + std::optional<TString> oldTag; + + auto tagIt = std::find_if( + newTags.begin(), + newTags.end(), + [&key] (const auto& pair) { + return pair.first == key; + }); + + if (tagIt != newTags.end()) { + oldTag = std::move(tagIt->second); + tagIt->second = std::move(newTagString); + } else { + newTags.emplace_back(key, std::move(newTagString)); + } + + DoSetAllocationTags(std::move(newTags)); + + if (oldTag.has_value()) { + return FromString<TTag>(oldTag.value()); + } + } else { + DoSetAllocationTags({{key, std::move(newTagString)}}); + } + + return std::nullopt; +} + //////////////////////////////////////////////////////////////////////////////// namespace NDetail { @@ -222,6 +320,12 @@ Y_FORCE_INLINE TTraceContextPtr CreateTraceContextFromCurrent(TString spanName) return context ? context->CreateChild(std::move(spanName)) : TTraceContext::NewRoot(std::move(spanName)); } +Y_FORCE_INLINE TTraceContextPtr GetOrCreateTraceContext(TString spanNameIfCreate) +{ + auto* context = TryGetCurrentTraceContext(); + return context ? context : TTraceContext::NewRoot(std::move(spanNameIfCreate)); +} + //////////////////////////////////////////////////////////////////////////////// template <class TFn> diff --git a/yt/yt/core/tracing/trace_context.cpp b/yt/yt/core/tracing/trace_context.cpp index 152b01627ed..d813e84db4b 100644 --- a/yt/yt/core/tracing/trace_context.cpp +++ b/yt/yt/core/tracing/trace_context.cpp @@ -264,22 +264,66 @@ void TTraceContext::SetLoggingTag(const TString& loggingTag) LoggingTag_ = loggingTag; } -void TTraceContext::SetAllocationTags(TAllocationTagsPtr tags) +void TTraceContext::ClearAllocationTagsPtr() noexcept { - AllocationTags_ = std::move(tags); + auto writerGuard = WriterGuard(AllocationTagsRWLock_); + auto guard = Guard(AllocationTagsAsRefCountedSpinlock_); + AllocationTags_ = nullptr; } -TAllocationTagsPtr TTraceContext::GetAllocationTags() const +TAllocationTags::TTags TTraceContext::DoGetAllocationTags() const { - return AllocationTags_; + VERIFY_SPINLOCK_AFFINITY(AllocationTagsRWLock_); + + TAllocationTagsPtr tags = nullptr; + + { + // Local guard for copy RefCounted AllocationTags_. + auto guard = Guard(AllocationTagsAsRefCountedSpinlock_); + tags = AllocationTags_; + } + + if (tags != nullptr) { + return tags->GetTags(); + } + + return {}; +} + +TAllocationTags::TTags TTraceContext::GetAllocationTags() const +{ + auto readerGuard = ReaderGuard(AllocationTagsRWLock_); + return DoGetAllocationTags(); } -std::vector<std::pair<TString, TString>> TTraceContext::ExtractAllocationTags() const +TAllocationTagsPtr TTraceContext::GetAllocationTagsPtr() const noexcept { - if (AllocationTags_ != nullptr) { - return AllocationTags_->GetTags(); + // Local guard for copy RefCounted AllocationTags_ for allocator callback CreateAllocationTagsData(). + auto guard = Guard(AllocationTagsAsRefCountedSpinlock_); + + auto copy = AllocationTags_; + return copy; +} + +void TTraceContext::DoSetAllocationTags(TAllocationTags::TTags&& tags) +{ + VERIFY_SPINLOCK_AFFINITY(AllocationTagsRWLock_); + + TAllocationTagsPtr allocationTagsPtr = nullptr; + if (!tags.empty()) { + // Allocation MUST be done BEFORE Guard(AllocationTagsAsRefCountedSpinlock_) to avoid deadlock with CreateAllocationTagsData(). + allocationTagsPtr = New<TAllocationTags>(std::move(tags)); } - return {}; + + auto guard = Guard(AllocationTagsAsRefCountedSpinlock_); + AllocationTags_ = allocationTagsPtr; +} + +void TTraceContext::SetAllocationTags(TAllocationTags::TTags&& tags) +{ + auto writerGuard = WriterGuard(AllocationTagsRWLock_); + + return DoSetAllocationTags(std::move(tags)); } void TTraceContext::SetRecorded() diff --git a/yt/yt/core/tracing/trace_context.h b/yt/yt/core/tracing/trace_context.h index a6123b10bcc..7276a802f5c 100644 --- a/yt/yt/core/tracing/trace_context.h +++ b/yt/yt/core/tracing/trace_context.h @@ -1,7 +1,11 @@ #pragma once +#include "allocation_tags.h" +#include "library/cpp/yt/threading/rw_spin_lock.h" #include "public.h" +#include <yt/yt/library/tracing/public.h> + #include <yt/yt/core/misc/guid.h> #include <yt/yt/core/profiling/public.h> @@ -10,8 +14,6 @@ #include <yt/yt/core/concurrency/public.h> -#include <yt/yt/library/tracing/public.h> - #include <library/cpp/yt/threading/spin_lock.h> #include <atomic> @@ -119,13 +121,24 @@ public: void SetRequestId(TRequestId requestId); TRequestId GetRequestId() const; - //! Sets allocation tags. - /*! - * Not thread-safe. - */ - void SetAllocationTags(TAllocationTagsPtr tags); - TAllocationTagsPtr GetAllocationTags() const; - std::vector<std::pair<TString, TString>> ExtractAllocationTags() const; + void SetAllocationTags(TAllocationTags::TTags&& tags); + + TAllocationTags::TTags GetAllocationTags() const; + + TAllocationTagsPtr GetAllocationTagsPtr() const noexcept; + + void ClearAllocationTagsPtr() noexcept; + + template <typename TTag> + std::optional<TTag> FindAllocationTag(const TString& key) const; + + template <typename TTag> + std::optional<TTag> SetAllocationTag( + const TString& key, + TTag value); + + template <typename TTag> + std::optional<TTag> RemoveAllocationTag(const TString& key); //! Sets logging tag. /*! @@ -229,6 +242,10 @@ private: NYson::TYsonString Baggage_; std::vector<std::pair<TString, std::variant<TString, i64>>> ProfilingTags_; + + // Must NOT allocate memory on the heap in callbacks with usage of AllocationTags_ to avoid deadlock with allocator. + YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, AllocationTagsRWLock_); + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, AllocationTagsAsRefCountedSpinlock_); TAllocationTagsPtr AllocationTags_; TTraceContext( @@ -238,6 +255,17 @@ private: DECLARE_NEW_FRIEND() void SetDuration(); + + void DoSetAllocationTags(TAllocationTags::TTags&& tags); + + template <typename TTag> + std::optional<TTag> DoSetAllocationTag(const TString& key, TTag newTag); + + TAllocationTags::TTags DoGetAllocationTags() const; + + template <typename TTag> + std::optional<TTag> DoFindAllocationTag(const TString& key) const; + }; DEFINE_REFCOUNTED_TYPE(TTraceContext) @@ -391,7 +419,6 @@ private: //////////////////////////////////////////////////////////////////////////////// - } // namespace NYT::NTracing #define TRACE_CONTEXT_INL_H_ diff --git a/yt/yt/core/tracing/unittests/allocation_tags_ut.cpp b/yt/yt/core/tracing/unittests/allocation_tags_ut.cpp new file mode 100644 index 00000000000..8f7e79d15b5 --- /dev/null +++ b/yt/yt/core/tracing/unittests/allocation_tags_ut.cpp @@ -0,0 +1,44 @@ +#include <yt/yt/core/test_framework/framework.h> + +#include <yt/yt/core/tracing/allocation_tags.h> +#include <yt/yt/core/tracing/trace_context.h> + +namespace NYT::NTracing { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TestAllocationTags, GetSetAllocationTags) +{ + auto traceContext = TTraceContext::NewRoot("Root"); + TTraceContextGuard guard(traceContext); + + ASSERT_EQ(traceContext->FindAllocationTag<TString>("a"), std::nullopt); + + traceContext->SetAllocationTags({{"user", "first"}, {"sometag", "my"}}); + ASSERT_EQ(traceContext->FindAllocationTag<TMemoryTag>(MemoryTagLiteral), std::nullopt); + ASSERT_EQ(traceContext->FindAllocationTag<TString>("user"), "first"); + ASSERT_EQ(traceContext->FindAllocationTag<TString>("sometag"), "my"); + ASSERT_EQ(traceContext->FindAllocationTag<TString>("other"), std::nullopt); + ASSERT_EQ(traceContext->FindAllocationTag<int>("other"), std::nullopt); + + traceContext->SetAllocationTag<TString>("a", "e"); + + ASSERT_EQ(traceContext->FindAllocationTag<TString>("a"), "e"); + + traceContext->RemoveAllocationTag<TString>("a"); + ASSERT_EQ(traceContext->FindAllocationTag<TString>("a"), std::nullopt); + + traceContext->RemoveAllocationTag<TString>("user"); + traceContext->RemoveAllocationTag<TString>("sometag"); + ASSERT_EQ(traceContext->FindAllocationTag<TString>("user"), std::nullopt); + ASSERT_EQ(traceContext->FindAllocationTag<TString>("sometag"), std::nullopt); + ASSERT_TRUE(traceContext->GetAllocationTags().empty()); + + traceContext->SetAllocationTag<TMemoryTag>(MemoryTagLiteral, TMemoryTag{1}); + ASSERT_EQ(traceContext->FindAllocationTag<TMemoryTag>(MemoryTagLiteral), TMemoryTag{1}); + ASSERT_FALSE(traceContext->GetAllocationTags().empty()); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NTracing diff --git a/yt/yt/core/tracing/unittests/ya.make b/yt/yt/core/tracing/unittests/ya.make new file mode 100644 index 00000000000..1b64215672d --- /dev/null +++ b/yt/yt/core/tracing/unittests/ya.make @@ -0,0 +1,18 @@ +GTEST(unittester-core-tracing) + +INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) + +SRCS( + allocation_tags_ut.cpp +) + +INCLUDE(${ARCADIA_ROOT}/yt/opensource_tests.inc) + +PEERDIR( + yt/yt/core + yt/yt/core/test_framework +) + +SIZE(SMALL) + +END() diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make index 884956c8304..6478c3f2ff4 100644 --- a/yt/yt/core/ya.make +++ b/yt/yt/core/ya.make @@ -367,6 +367,7 @@ RECURSE_FOR_TESTS( http/unittests misc/unittests net/unittests + tracing/unittests yson/unittests http/mock net/mock |