diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-10-08 14:29:52 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-10-08 14:38:09 +0300 |
commit | 37e325a9a8628ece2764f0f26b09ebc09ca39814 (patch) | |
tree | e037eb46f3edc99649ed139e66e4662d7b5d5a90 | |
parent | 71d6503c3f395e6f02f2d526262b42e32002c78e (diff) | |
download | ydb-37e325a9a8628ece2764f0f26b09ebc09ca39814.tar.gz |
Intermediate changes
commit_hash:26f232484de806b5e8d9650d8690d9033c889924
-rw-r--r-- | yt/yt/library/tracing/example/main.cpp | 3 | ||||
-rw-r--r-- | yt/yt/library/tracing/jaeger/private.h | 5 | ||||
-rw-r--r-- | yt/yt/library/tracing/jaeger/tracer.cpp | 38 | ||||
-rw-r--r-- | yt/yt/library/tracing/jaeger/tracer.h | 51 |
4 files changed, 51 insertions, 46 deletions
diff --git a/yt/yt/library/tracing/example/main.cpp b/yt/yt/library/tracing/example/main.cpp index d2bbcb1972..300532087d 100644 --- a/yt/yt/library/tracing/example/main.cpp +++ b/yt/yt/library/tracing/example/main.cpp @@ -8,8 +8,6 @@ #include <util/system/env.h> -#include <random> - using namespace NYT; using namespace NYT::NTracing; @@ -99,7 +97,6 @@ NAuth::TTvmServiceConfigPtr GetTvmConfig() int main(int argc, char* argv[]) { try { - bool test = false; auto usage = Format("usage: %v [--test] COLLECTOR_ENDPOINTS", argv[0]); diff --git a/yt/yt/library/tracing/jaeger/private.h b/yt/yt/library/tracing/jaeger/private.h index f4bc091225..17ac2b530d 100644 --- a/yt/yt/library/tracing/jaeger/private.h +++ b/yt/yt/library/tracing/jaeger/private.h @@ -17,4 +17,9 @@ YT_DEFINE_GLOBAL(const NProfiling::TProfiler, TracingProfiler, "/tracing"); //////////////////////////////////////////////////////////////////////////////// +DECLARE_REFCOUNTED_CLASS(TJaegerTracer) +DECLARE_REFCOUNTED_CLASS(TJaegerChannelManager) + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NTracing diff --git a/yt/yt/library/tracing/jaeger/tracer.cpp b/yt/yt/library/tracing/jaeger/tracer.cpp index 1c02cee3a6..d9958ba811 100644 --- a/yt/yt/library/tracing/jaeger/tracer.cpp +++ b/yt/yt/library/tracing/jaeger/tracer.cpp @@ -38,8 +38,8 @@ using namespace NAuth; //////////////////////////////////////////////////////////////////////////////// -static const auto& Logger = JaegerLogger; -static const auto& Profiler = TracingProfiler; +static constexpr auto& Logger = JaegerLogger; +static constexpr auto& Profiler = TracingProfiler; //////////////////////////////////////////////////////////////////////////////// @@ -50,7 +50,8 @@ static const TString TracingServiceAlias = "tracing"; void TJaegerTracerDynamicConfig::Register(TRegistrar registrar) { - registrar.Parameter("collector_channel_config", &TThis::CollectorChannelConfig) + registrar.Parameter("collector_channel", &TThis::CollectorChannel) + .Alias("collector_channel_config") .Optional(); registrar.Parameter("max_request_size", &TThis::MaxRequestSize) .Default(); @@ -110,8 +111,8 @@ TJaegerTracerConfigPtr TJaegerTracerConfig::ApplyDynamic(const TJaegerTracerDyna { auto config = New<TJaegerTracerConfig>(); config->CollectorChannelConfig = CollectorChannelConfig; - if (dynamicConfig->CollectorChannelConfig) { - config->CollectorChannelConfig = dynamicConfig->CollectorChannelConfig; + if (dynamicConfig->CollectorChannel) { + config->CollectorChannelConfig = dynamicConfig->CollectorChannel; } config->FlushPeriod = dynamicConfig->FlushPeriod.value_or(FlushPeriod); @@ -231,9 +232,6 @@ std::vector<TK> ExtractKeys(THashMap<TK, TV> const& inputMap) { //////////////////////////////////////////////////////////////////////////////// -TBatchInfo::TBatchInfo() -{ } - TBatchInfo::TBatchInfo(const TString& endpoint) : TracesDequeued_(Profiler().WithTag("endpoint", endpoint).Counter("/traces_dequeued")) , TracesDropped_(Profiler().WithTag("endpoint", endpoint).Counter("/traces_dropped")) @@ -383,14 +381,14 @@ TInstant TJaegerChannelManager::GetReopenTime() } TJaegerTracer::TJaegerTracer( - const TJaegerTracerConfigPtr& config) + TJaegerTracerConfigPtr config) : ActionQueue_(New<TActionQueue>("Jaeger")) , FlushExecutor_(New<TPeriodicExecutor>( ActionQueue_->GetInvoker(), - BIND(&TJaegerTracer::Flush, MakeStrong(this)), + BIND(&TJaegerTracer::DoFlush, MakeStrong(this)), config->FlushPeriod)) - , Config_(config) , TvmService_(config->TvmService ? CreateTvmService(config->TvmService) : nullptr) + , Config_(std::move(config)) { Profiler().AddFuncGauge("/enabled", MakeStrong(this), [this] { return Config_.Acquire()->IsEnabled(); @@ -553,10 +551,11 @@ void TJaegerTracer::DropFullQueue() } } -void TJaegerTracer::Flush() +void TJaegerTracer::DoFlush() { YT_LOG_DEBUG("Started span flush"); + auto config = Config_.Acquire(); auto flushStartTime = TInstant::Now(); @@ -564,7 +563,7 @@ void TJaegerTracer::Flush() if (OpenChannelConfig_ != config->CollectorChannelConfig) { OpenChannelConfig_ = config->CollectorChannelConfig; for (auto& [endpoint, channel] : CollectorChannels_) { - CollectorChannels_[endpoint].ForceReset(flushStartTime); + CollectorChannels_[endpoint]->ForceReset(flushStartTime); } } @@ -582,9 +581,7 @@ void TJaegerTracer::Flush() return; } - std::stack<TString> toRemove; auto keys = ExtractKeys(BatchInfo_); - if (keys.empty()) { YT_LOG_DEBUG("Span batch info is empty"); LastSuccessfulFlushTime_ = flushStartTime; @@ -592,10 +589,11 @@ void TJaegerTracer::Flush() return; } + std::stack<TString> toRemove; for (const auto& endpoint : keys) { auto [batches, batchCount, spanCount] = PeekQueue(config, endpoint); if (batchCount <= 0) { - if (!CollectorChannels_.contains(endpoint) || flushStartTime > CollectorChannels_[endpoint].GetReopenTime() + config->EndpointChannelTimeout) { + if (!CollectorChannels_.contains(endpoint) || flushStartTime > CollectorChannels_[endpoint]->GetReopenTime() + config->EndpointChannelTimeout) { toRemove.push(endpoint); } YT_LOG_DEBUG("Span queue is empty (Endpoint: %v)", endpoint); @@ -614,16 +612,16 @@ void TJaegerTracer::Flush() auto it = CollectorChannels_.find(endpoint); if (it == CollectorChannels_.end()) { - it = CollectorChannels_.emplace(endpoint, TJaegerChannelManager(config, endpoint, TvmService_)).first; + it = CollectorChannels_.emplace(endpoint, New<TJaegerChannelManager>(config, endpoint, TvmService_)).first; } auto& channel = it->second; - if (channel.NeedsReopen(flushStartTime)) { - channel = TJaegerChannelManager(config, endpoint, TvmService_); + if (channel->NeedsReopen(flushStartTime)) { + channel = New<TJaegerChannelManager>(config, endpoint, TvmService_); } - if (channel.Push(batches, spanCount)) { + if (channel->Push(batches, spanCount)) { DropQueue(batchCount, endpoint); YT_LOG_DEBUG("Spans sent (Endpoint: %v)", endpoint); LastSuccessfulFlushTime_ = flushStartTime; diff --git a/yt/yt/library/tracing/jaeger/tracer.h b/yt/yt/library/tracing/jaeger/tracer.h index 02a71b3cac..ed4850b1e9 100644 --- a/yt/yt/library/tracing/jaeger/tracer.h +++ b/yt/yt/library/tracing/jaeger/tracer.h @@ -1,6 +1,6 @@ #pragma once -#include "public.h" +#include "private.h" #include <yt/yt/library/tracing/tracer.h> @@ -28,7 +28,7 @@ class TJaegerTracerDynamicConfig : public NYTree::TYsonStruct { public: - NRpc::NGrpc::TChannelConfigPtr CollectorChannelConfig; + NRpc::NGrpc::TChannelConfigPtr CollectorChannel; std::optional<i64> MaxRequestSize; @@ -98,12 +98,10 @@ DEFINE_REFCOUNTED_TYPE(TJaegerTracerConfig) //////////////////////////////////////////////////////////////////////////////// -DECLARE_REFCOUNTED_CLASS(TJaegerTracer) - class TBatchInfo { public: - TBatchInfo(); + TBatchInfo() = default; explicit TBatchInfo(const TString& endpoint); void PopFront(); @@ -113,18 +111,21 @@ public: std::tuple<std::vector<TSharedRef>, int, int> PeekQueue(const TJaegerTracerConfigPtr& config, std::optional<TSharedRef> processInfo); private: + const NProfiling::TCounter TracesDequeued_; + const NProfiling::TCounter TracesDropped_; + const NProfiling::TGauge MemoryUsage_; + const NProfiling::TGauge TraceQueueSize_; + std::deque<std::pair<int, TSharedRef>> BatchQueue_; i64 QueueMemory_ = 0; i64 QueueSize_ = 0; - - NProfiling::TCounter TracesDequeued_; - NProfiling::TCounter TracesDropped_; - NProfiling::TGauge MemoryUsage_; - NProfiling::TGauge TraceQueueSize_; }; +//////////////////////////////////////////////////////////////////////////////// + class TJaegerChannelManager + : public TRefCounted { public: TJaegerChannelManager(); @@ -140,25 +141,29 @@ public: TInstant GetReopenTime(); private: - NRpc::IChannelPtr Channel_; - NAuth::ITvmServicePtr TvmService_; + const NAuth::ITvmServicePtr TvmService_; + const TString Endpoint_; - TString Endpoint_; + const TInstant ReopenTime_; + const TDuration RpcTimeout_; - TInstant ReopenTime_; - TDuration RpcTimeout_; + const NProfiling::TCounter PushedBytes_; + const NProfiling::TCounter PushErrors_; + const NProfiling::TSummary PayloadSize_; + const NProfiling::TEventTimer PushDuration_; - NProfiling::TCounter PushedBytes_; - NProfiling::TCounter PushErrors_; - NProfiling::TSummary PayloadSize_; - NProfiling::TEventTimer PushDuration_; + NRpc::IChannelPtr Channel_; }; +DEFINE_REFCOUNTED_TYPE(TJaegerChannelManager) + +//////////////////////////////////////////////////////////////////////////////// + class TJaegerTracer : public ITracer { public: - TJaegerTracer(const TJaegerTracerConfigPtr& config); + explicit TJaegerTracer(TJaegerTracerConfigPtr config); TFuture<void> WaitFlush(); @@ -171,6 +176,7 @@ public: private: const NConcurrency::TActionQueuePtr ActionQueue_; const NConcurrency::TPeriodicExecutorPtr FlushExecutor_; + const NAuth::ITvmServicePtr TvmService_; TAtomicIntrusivePtr<TJaegerTracerConfig> Config_; @@ -184,12 +190,11 @@ private: TAtomicObject<TPromise<void>> QueueEmptyPromise_ = NewPromise<void>(); - THashMap<TString, TJaegerChannelManager> CollectorChannels_; + THashMap<TString, TJaegerChannelManagerPtr> CollectorChannels_; NRpc::NGrpc::TChannelConfigPtr OpenChannelConfig_; - NAuth::ITvmServicePtr TvmService_; - void Flush(); + void DoFlush(); void DequeueAll(const TJaegerTracerConfigPtr& config); void NotifyEmptyQueue(); |