diff options
author | babenko <babenko@yandex-team.com> | 2024-07-02 08:22:38 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2024-07-02 08:32:26 +0300 |
commit | 6ab551225dba20153d9ab995eac7c83d49887549 (patch) | |
tree | ded6284f11f4406be10eb82b2177153ea0e7e08c | |
parent | 318b857015da326b953274ecd06f29565f6f53ae (diff) | |
download | ydb-6ab551225dba20153d9ab995eac7c83d49887549.tar.gz |
Refactor trace context finish
72f57149f00e878b81568658fa717c2074d0d178
-rw-r--r-- | yt/yt/core/tracing/trace_context.cpp | 110 | ||||
-rw-r--r-- | yt/yt/core/tracing/trace_context.h | 32 |
2 files changed, 68 insertions, 74 deletions
diff --git a/yt/yt/core/tracing/trace_context.cpp b/yt/yt/core/tracing/trace_context.cpp index 0cbb7d828b..a792c6397a 100644 --- a/yt/yt/core/tracing/trace_context.cpp +++ b/yt/yt/core/tracing/trace_context.cpp @@ -255,7 +255,6 @@ TTraceContext::TTraceContext( , State_(parentTraceContext ? parentTraceContext->State_.load() : (parentSpanContext.Sampled ? ETraceContextState::Sampled : ETraceContextState::Disabled)) - , Propagated_(true) , ParentContext_(std::move(parentTraceContext)) , SpanName_(std::move(spanName)) , RequestId_(ParentContext_ ? ParentContext_->GetRequestId() : TRequestId{}) @@ -355,8 +354,8 @@ void TTraceContext::SetAllocationTags(TAllocationTags::TTags&& tags) void TTraceContext::SetRecorded() { - auto disabled = ETraceContextState::Disabled; - State_.compare_exchange_strong(disabled, ETraceContextState::Recorded); + auto expected = ETraceContextState::Disabled; + State_.compare_exchange_strong(expected, ETraceContextState::Recorded); } void TTraceContext::SetPropagated(bool value) @@ -395,11 +394,7 @@ TDuration TTraceContext::GetElapsedTime() const void TTraceContext::SetSampled(bool value) { - if (!value) { - State_ = ETraceContextState::Disabled; - } else { - State_ = ETraceContextState::Sampled; - } + State_ = value ? ETraceContextState::Sampled : ETraceContextState::Disabled; } TInstant TTraceContext::GetStartTime() const @@ -409,8 +404,9 @@ TInstant TTraceContext::GetStartTime() const TDuration TTraceContext::GetDuration() const { - YT_ASSERT(Finished_.load()); - return NProfiling::CpuDurationToDuration(Duration_.load()); + auto finishTime = FinishTime_.load(); + YT_VERIFY(finishTime != 0); + return NProfiling::CpuDurationToDuration(finishTime - StartTime_); } TTraceContext::TTagList TTraceContext::GetTags() const @@ -532,69 +528,73 @@ void TTraceContext::AddLogEntry(TCpuInstant at, TString message) Logs_.push_back(TTraceLogEntry{at, std::move(message)}); } -bool TTraceContext::IsFinished() +bool TTraceContext::IsFinished() const { - return Finished_.load(); + return FinishTime_.load() != 0; } bool TTraceContext::IsSampled() const { - auto traceContext = this; - while (traceContext) { - auto state = traceContext->State_.load(std::memory_order::relaxed); - if (state == ETraceContextState::Sampled) { - return true; - } else if (state == ETraceContextState::Disabled) { - return false; + auto* currentTraceContext = this; + while (currentTraceContext) { + switch (currentTraceContext->State_.load(std::memory_order::relaxed)) { + case ETraceContextState::Sampled: + return true; + case ETraceContextState::Disabled: + return false; + case ETraceContextState::Recorded: + break; } - - traceContext = traceContext->ParentContext_.Get(); + currentTraceContext = currentTraceContext->ParentContext_.Get(); } return false; } -void TTraceContext::SetDuration() -{ - if (Duration_.load() == 0) { - Duration_ = GetCpuInstant() - StartTime_; - } -} - void TTraceContext::Finish() { - if (Finished_.exchange(true)) { + auto expectedFinishTime = TCpuInstant(0); + if (!FinishTime_.compare_exchange_strong(expectedFinishTime, GetCpuInstant())) { return; } - SetDuration(); - auto state = State_.load(std::memory_order::relaxed); - if (state == ETraceContextState::Disabled) { - return; - } else if (state == ETraceContextState::Sampled) { - if (auto tracer = GetGlobalTracer(); tracer) { - tracer->Enqueue(MakeStrong(this)); - } - } else if (state == ETraceContextState::Recorded) { - if (!IsSampled()) { - return; - } + switch (State_.load(std::memory_order::relaxed)) { + case ETraceContextState::Disabled: + break; - if (auto tracer = GetGlobalTracer(); tracer) { - auto traceContext = this; - while (traceContext) { - if (traceContext->State_.load() != ETraceContextState::Recorded) { - break; - } + case ETraceContextState::Sampled: + if (auto tracer = GetGlobalTracer()) { + SubmitToTracer(tracer); + } + break; - if (traceContext->Finished_.load() && !traceContext->Submitted_.exchange(true)) { - traceContext->SetDuration(); - tracer->Enqueue(MakeStrong(traceContext)); - } + case ETraceContextState::Recorded: + if (!IsSampled()) { + break; + } - traceContext = traceContext->ParentContext_.Get(); + if (auto tracer = GetGlobalTracer()) { + auto* currentTraceContext = this; + while (currentTraceContext) { + if (currentTraceContext->State_.load() != ETraceContextState::Recorded) { + break; + } + + if (currentTraceContext->IsFinished()) { + currentTraceContext->SubmitToTracer(tracer); + } + + currentTraceContext = currentTraceContext->ParentContext_.Get(); + } } - } + break; + } +} + +void TTraceContext::SubmitToTracer(const ITracerPtr& tracer) +{ + if (!Submitted_.exchange(true)) { + tracer->Enqueue(this); } } @@ -704,8 +704,8 @@ TTraceContextPtr TTraceContext::NewChildFromRpc( void TTraceContext::IncrementElapsedCpuTime(NProfiling::TCpuDuration delta) { - for (auto* current = this; current; current = current->ParentContext_.Get()) { - current->ElapsedCpuTime_ += delta; + for (auto* currentTraceContext = this; currentTraceContext; currentTraceContext = currentTraceContext->ParentContext_.Get()) { + currentTraceContext->ElapsedCpuTime_ += delta; } } diff --git a/yt/yt/core/tracing/trace_context.h b/yt/yt/core/tracing/trace_context.h index 01a03da443..c901dc1222 100644 --- a/yt/yt/core/tracing/trace_context.h +++ b/yt/yt/core/tracing/trace_context.h @@ -55,7 +55,7 @@ TTracingTransportConfigPtr GetTracingTransportConfig(); DEFINE_ENUM(ETraceContextState, (Disabled) // Used to propagate TraceId, RequestId and LoggingTag. (Recorded) // May be sampled later. - (Sampled) // Sampled and will be reported to jaeger. + (Sampled) // Sampled and will be reported to tracer. ); //////////////////////////////////////////////////////////////////////////////// @@ -67,7 +67,7 @@ DEFINE_ENUM(ETraceContextState, * 1) TraceId, RequestId and LoggingTag are recorded inside trace context and * passed to logger. * 2) ElapsedCpu time is tracked by fiber scheduler during context switch. - * 3) Opentracing compatible information is recorded and later pushed to jaeger. + * 3) Opentracing compatible information is recorded and later pushed to tracer. * * TTraceContext objects within a single process form a tree. * @@ -79,14 +79,15 @@ class TTraceContext : public TRefCounted { public: + //! Returns the flag indicating that this trace is finished (via call to #Finish). + bool IsFinished() const; //! Finalizes and publishes the context (if sampling is enabled). /*! * Safe to call multiple times from arbitrary threads; only the first call matters. */ void Finish(); - bool IsFinished(); - //! IsRecorded returns a flag indicating that this trace may be sent to jaeger. + //! Returns the flag indicating that this trace may be sent to tracer. /*! * This flag should be used for fast-path optimization to skip trace annotation and child span creation. */ @@ -96,7 +97,7 @@ public: bool IsSampled() const; void SetSampled(bool value = true); - //! IsPropagated returns a flag indicating that trace is serialized to proto. + //! Returns the flag indicating that trace is serialized to protobuf. /*! * By default trace context is propagated. * Not thread-safe. @@ -126,23 +127,18 @@ public: TRequestId GetRequestId() const; void SetAllocationTags(TAllocationTags::TTags&& tags); - TAllocationTags::TTags GetAllocationTags() const; TAllocationTagsPtr GetAllocationTagsPtr() const noexcept; - void SetAllocationTagsPtr(TAllocationTagsPtr allocationTags) noexcept; - void ClearAllocationTagsPtr() noexcept; template <typename TTag> std::optional<TTag> FindAllocationTag(const TString& key) const; - template <typename TTag> std::optional<TTag> SetAllocationTag( const TString& key, TTag value); - template <typename TTag> std::optional<TTag> RemoveAllocationTag(const TString& key); @@ -175,7 +171,7 @@ public: template <class T> void AddTag(const TString& tagName, const T& tagValue); - //! Adds error tag. Spans containing errors are highlighted in Jaeger UI. + //! Adds error tag. Spans containing errors are highlighted in tracing UI. void AddErrorTag(); struct TTraceLogEntry @@ -221,12 +217,11 @@ private: const TTraceId TraceId_; const TSpanId SpanId_; const TSpanId ParentSpanId_; - - // Right now, debug flag is just passed as-is. It is part of opentracing, but we do not interpret it in any way. + // Right now, debug flag is just passed as-is. It is part of OpenTracing, but we do not interpret it in any way. const bool Debug_; - mutable std::atomic<ETraceContextState> State_; - bool Propagated_; + std::atomic<ETraceContextState> State_; + bool Propagated_ = true; const TTraceContextPtr ParentContext_; const TString SpanName_; @@ -237,8 +232,7 @@ private: std::atomic<bool> Finished_ = false; std::atomic<bool> Submitted_ = false; - std::atomic<NProfiling::TCpuDuration> Duration_ = {0}; - + std::atomic<NProfiling::TCpuInstant> FinishTime_ = 0; std::atomic<NProfiling::TCpuDuration> ElapsedCpuTime_ = 0; YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_); @@ -260,8 +254,6 @@ private: TTraceContextPtr parentTraceContext = nullptr); DECLARE_NEW_FRIEND() - void SetDuration(); - void DoSetAllocationTags(TAllocationTags::TTags&& tags); template <typename TTag> @@ -271,6 +263,8 @@ private: template <typename TTag> std::optional<TTag> DoFindAllocationTag(const TString& key) const; + + void SubmitToTracer(const ITracerPtr& tracer); }; DEFINE_REFCOUNTED_TYPE(TTraceContext) |