aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2024-07-02 08:22:38 +0300
committerbabenko <babenko@yandex-team.com>2024-07-02 08:32:26 +0300
commit6ab551225dba20153d9ab995eac7c83d49887549 (patch)
treeded6284f11f4406be10eb82b2177153ea0e7e08c
parent318b857015da326b953274ecd06f29565f6f53ae (diff)
downloadydb-6ab551225dba20153d9ab995eac7c83d49887549.tar.gz
Refactor trace context finish
72f57149f00e878b81568658fa717c2074d0d178
-rw-r--r--yt/yt/core/tracing/trace_context.cpp110
-rw-r--r--yt/yt/core/tracing/trace_context.h32
2 files changed, 68 insertions, 74 deletions
diff --git a/yt/yt/core/tracing/trace_context.cpp b/yt/yt/core/tracing/trace_context.cpp
index 0cbb7d828b..a792c6397a 100644
--- a/yt/yt/core/tracing/trace_context.cpp
+++ b/yt/yt/core/tracing/trace_context.cpp
@@ -255,7 +255,6 @@ TTraceContext::TTraceContext(
, State_(parentTraceContext
? parentTraceContext->State_.load()
: (parentSpanContext.Sampled ? ETraceContextState::Sampled : ETraceContextState::Disabled))
- , Propagated_(true)
, ParentContext_(std::move(parentTraceContext))
, SpanName_(std::move(spanName))
, RequestId_(ParentContext_ ? ParentContext_->GetRequestId() : TRequestId{})
@@ -355,8 +354,8 @@ void TTraceContext::SetAllocationTags(TAllocationTags::TTags&& tags)
void TTraceContext::SetRecorded()
{
- auto disabled = ETraceContextState::Disabled;
- State_.compare_exchange_strong(disabled, ETraceContextState::Recorded);
+ auto expected = ETraceContextState::Disabled;
+ State_.compare_exchange_strong(expected, ETraceContextState::Recorded);
}
void TTraceContext::SetPropagated(bool value)
@@ -395,11 +394,7 @@ TDuration TTraceContext::GetElapsedTime() const
void TTraceContext::SetSampled(bool value)
{
- if (!value) {
- State_ = ETraceContextState::Disabled;
- } else {
- State_ = ETraceContextState::Sampled;
- }
+ State_ = value ? ETraceContextState::Sampled : ETraceContextState::Disabled;
}
TInstant TTraceContext::GetStartTime() const
@@ -409,8 +404,9 @@ TInstant TTraceContext::GetStartTime() const
TDuration TTraceContext::GetDuration() const
{
- YT_ASSERT(Finished_.load());
- return NProfiling::CpuDurationToDuration(Duration_.load());
+ auto finishTime = FinishTime_.load();
+ YT_VERIFY(finishTime != 0);
+ return NProfiling::CpuDurationToDuration(finishTime - StartTime_);
}
TTraceContext::TTagList TTraceContext::GetTags() const
@@ -532,69 +528,73 @@ void TTraceContext::AddLogEntry(TCpuInstant at, TString message)
Logs_.push_back(TTraceLogEntry{at, std::move(message)});
}
-bool TTraceContext::IsFinished()
+bool TTraceContext::IsFinished() const
{
- return Finished_.load();
+ return FinishTime_.load() != 0;
}
bool TTraceContext::IsSampled() const
{
- auto traceContext = this;
- while (traceContext) {
- auto state = traceContext->State_.load(std::memory_order::relaxed);
- if (state == ETraceContextState::Sampled) {
- return true;
- } else if (state == ETraceContextState::Disabled) {
- return false;
+ auto* currentTraceContext = this;
+ while (currentTraceContext) {
+ switch (currentTraceContext->State_.load(std::memory_order::relaxed)) {
+ case ETraceContextState::Sampled:
+ return true;
+ case ETraceContextState::Disabled:
+ return false;
+ case ETraceContextState::Recorded:
+ break;
}
-
- traceContext = traceContext->ParentContext_.Get();
+ currentTraceContext = currentTraceContext->ParentContext_.Get();
}
return false;
}
-void TTraceContext::SetDuration()
-{
- if (Duration_.load() == 0) {
- Duration_ = GetCpuInstant() - StartTime_;
- }
-}
-
void TTraceContext::Finish()
{
- if (Finished_.exchange(true)) {
+ auto expectedFinishTime = TCpuInstant(0);
+ if (!FinishTime_.compare_exchange_strong(expectedFinishTime, GetCpuInstant())) {
return;
}
- SetDuration();
- auto state = State_.load(std::memory_order::relaxed);
- if (state == ETraceContextState::Disabled) {
- return;
- } else if (state == ETraceContextState::Sampled) {
- if (auto tracer = GetGlobalTracer(); tracer) {
- tracer->Enqueue(MakeStrong(this));
- }
- } else if (state == ETraceContextState::Recorded) {
- if (!IsSampled()) {
- return;
- }
+ switch (State_.load(std::memory_order::relaxed)) {
+ case ETraceContextState::Disabled:
+ break;
- if (auto tracer = GetGlobalTracer(); tracer) {
- auto traceContext = this;
- while (traceContext) {
- if (traceContext->State_.load() != ETraceContextState::Recorded) {
- break;
- }
+ case ETraceContextState::Sampled:
+ if (auto tracer = GetGlobalTracer()) {
+ SubmitToTracer(tracer);
+ }
+ break;
- if (traceContext->Finished_.load() && !traceContext->Submitted_.exchange(true)) {
- traceContext->SetDuration();
- tracer->Enqueue(MakeStrong(traceContext));
- }
+ case ETraceContextState::Recorded:
+ if (!IsSampled()) {
+ break;
+ }
- traceContext = traceContext->ParentContext_.Get();
+ if (auto tracer = GetGlobalTracer()) {
+ auto* currentTraceContext = this;
+ while (currentTraceContext) {
+ if (currentTraceContext->State_.load() != ETraceContextState::Recorded) {
+ break;
+ }
+
+ if (currentTraceContext->IsFinished()) {
+ currentTraceContext->SubmitToTracer(tracer);
+ }
+
+ currentTraceContext = currentTraceContext->ParentContext_.Get();
+ }
}
- }
+ break;
+ }
+}
+
+void TTraceContext::SubmitToTracer(const ITracerPtr& tracer)
+{
+ if (!Submitted_.exchange(true)) {
+ tracer->Enqueue(this);
}
}
@@ -704,8 +704,8 @@ TTraceContextPtr TTraceContext::NewChildFromRpc(
void TTraceContext::IncrementElapsedCpuTime(NProfiling::TCpuDuration delta)
{
- for (auto* current = this; current; current = current->ParentContext_.Get()) {
- current->ElapsedCpuTime_ += delta;
+ for (auto* currentTraceContext = this; currentTraceContext; currentTraceContext = currentTraceContext->ParentContext_.Get()) {
+ currentTraceContext->ElapsedCpuTime_ += delta;
}
}
diff --git a/yt/yt/core/tracing/trace_context.h b/yt/yt/core/tracing/trace_context.h
index 01a03da443..c901dc1222 100644
--- a/yt/yt/core/tracing/trace_context.h
+++ b/yt/yt/core/tracing/trace_context.h
@@ -55,7 +55,7 @@ TTracingTransportConfigPtr GetTracingTransportConfig();
DEFINE_ENUM(ETraceContextState,
(Disabled) // Used to propagate TraceId, RequestId and LoggingTag.
(Recorded) // May be sampled later.
- (Sampled) // Sampled and will be reported to jaeger.
+ (Sampled) // Sampled and will be reported to tracer.
);
////////////////////////////////////////////////////////////////////////////////
@@ -67,7 +67,7 @@ DEFINE_ENUM(ETraceContextState,
* 1) TraceId, RequestId and LoggingTag are recorded inside trace context and
* passed to logger.
* 2) ElapsedCpu time is tracked by fiber scheduler during context switch.
- * 3) Opentracing compatible information is recorded and later pushed to jaeger.
+ * 3) Opentracing compatible information is recorded and later pushed to tracer.
*
* TTraceContext objects within a single process form a tree.
*
@@ -79,14 +79,15 @@ class TTraceContext
: public TRefCounted
{
public:
+ //! Returns the flag indicating that this trace is finished (via call to #Finish).
+ bool IsFinished() const;
//! Finalizes and publishes the context (if sampling is enabled).
/*!
* Safe to call multiple times from arbitrary threads; only the first call matters.
*/
void Finish();
- bool IsFinished();
- //! IsRecorded returns a flag indicating that this trace may be sent to jaeger.
+ //! Returns the flag indicating that this trace may be sent to tracer.
/*!
* This flag should be used for fast-path optimization to skip trace annotation and child span creation.
*/
@@ -96,7 +97,7 @@ public:
bool IsSampled() const;
void SetSampled(bool value = true);
- //! IsPropagated returns a flag indicating that trace is serialized to proto.
+ //! Returns the flag indicating that trace is serialized to protobuf.
/*!
* By default trace context is propagated.
* Not thread-safe.
@@ -126,23 +127,18 @@ public:
TRequestId GetRequestId() const;
void SetAllocationTags(TAllocationTags::TTags&& tags);
-
TAllocationTags::TTags GetAllocationTags() const;
TAllocationTagsPtr GetAllocationTagsPtr() const noexcept;
-
void SetAllocationTagsPtr(TAllocationTagsPtr allocationTags) noexcept;
-
void ClearAllocationTagsPtr() noexcept;
template <typename TTag>
std::optional<TTag> FindAllocationTag(const TString& key) const;
-
template <typename TTag>
std::optional<TTag> SetAllocationTag(
const TString& key,
TTag value);
-
template <typename TTag>
std::optional<TTag> RemoveAllocationTag(const TString& key);
@@ -175,7 +171,7 @@ public:
template <class T>
void AddTag(const TString& tagName, const T& tagValue);
- //! Adds error tag. Spans containing errors are highlighted in Jaeger UI.
+ //! Adds error tag. Spans containing errors are highlighted in tracing UI.
void AddErrorTag();
struct TTraceLogEntry
@@ -221,12 +217,11 @@ private:
const TTraceId TraceId_;
const TSpanId SpanId_;
const TSpanId ParentSpanId_;
-
- // Right now, debug flag is just passed as-is. It is part of opentracing, but we do not interpret it in any way.
+ // Right now, debug flag is just passed as-is. It is part of OpenTracing, but we do not interpret it in any way.
const bool Debug_;
- mutable std::atomic<ETraceContextState> State_;
- bool Propagated_;
+ std::atomic<ETraceContextState> State_;
+ bool Propagated_ = true;
const TTraceContextPtr ParentContext_;
const TString SpanName_;
@@ -237,8 +232,7 @@ private:
std::atomic<bool> Finished_ = false;
std::atomic<bool> Submitted_ = false;
- std::atomic<NProfiling::TCpuDuration> Duration_ = {0};
-
+ std::atomic<NProfiling::TCpuInstant> FinishTime_ = 0;
std::atomic<NProfiling::TCpuDuration> ElapsedCpuTime_ = 0;
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_);
@@ -260,8 +254,6 @@ private:
TTraceContextPtr parentTraceContext = nullptr);
DECLARE_NEW_FRIEND()
- void SetDuration();
-
void DoSetAllocationTags(TAllocationTags::TTags&& tags);
template <typename TTag>
@@ -271,6 +263,8 @@ private:
template <typename TTag>
std::optional<TTag> DoFindAllocationTag(const TString& key) const;
+
+ void SubmitToTracer(const ITracerPtr& tracer);
};
DEFINE_REFCOUNTED_TYPE(TTraceContext)