aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-10-08 14:29:52 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-10-08 14:38:09 +0300
commit37e325a9a8628ece2764f0f26b09ebc09ca39814 (patch)
treee037eb46f3edc99649ed139e66e4662d7b5d5a90
parent71d6503c3f395e6f02f2d526262b42e32002c78e (diff)
downloadydb-37e325a9a8628ece2764f0f26b09ebc09ca39814.tar.gz
Intermediate changes
commit_hash:26f232484de806b5e8d9650d8690d9033c889924
-rw-r--r--yt/yt/library/tracing/example/main.cpp3
-rw-r--r--yt/yt/library/tracing/jaeger/private.h5
-rw-r--r--yt/yt/library/tracing/jaeger/tracer.cpp38
-rw-r--r--yt/yt/library/tracing/jaeger/tracer.h51
4 files changed, 51 insertions, 46 deletions
diff --git a/yt/yt/library/tracing/example/main.cpp b/yt/yt/library/tracing/example/main.cpp
index d2bbcb1972..300532087d 100644
--- a/yt/yt/library/tracing/example/main.cpp
+++ b/yt/yt/library/tracing/example/main.cpp
@@ -8,8 +8,6 @@
#include <util/system/env.h>
-#include <random>
-
using namespace NYT;
using namespace NYT::NTracing;
@@ -99,7 +97,6 @@ NAuth::TTvmServiceConfigPtr GetTvmConfig()
int main(int argc, char* argv[])
{
try {
-
bool test = false;
auto usage = Format("usage: %v [--test] COLLECTOR_ENDPOINTS", argv[0]);
diff --git a/yt/yt/library/tracing/jaeger/private.h b/yt/yt/library/tracing/jaeger/private.h
index f4bc091225..17ac2b530d 100644
--- a/yt/yt/library/tracing/jaeger/private.h
+++ b/yt/yt/library/tracing/jaeger/private.h
@@ -17,4 +17,9 @@ YT_DEFINE_GLOBAL(const NProfiling::TProfiler, TracingProfiler, "/tracing");
////////////////////////////////////////////////////////////////////////////////
+DECLARE_REFCOUNTED_CLASS(TJaegerTracer)
+DECLARE_REFCOUNTED_CLASS(TJaegerChannelManager)
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NTracing
diff --git a/yt/yt/library/tracing/jaeger/tracer.cpp b/yt/yt/library/tracing/jaeger/tracer.cpp
index 1c02cee3a6..d9958ba811 100644
--- a/yt/yt/library/tracing/jaeger/tracer.cpp
+++ b/yt/yt/library/tracing/jaeger/tracer.cpp
@@ -38,8 +38,8 @@ using namespace NAuth;
////////////////////////////////////////////////////////////////////////////////
-static const auto& Logger = JaegerLogger;
-static const auto& Profiler = TracingProfiler;
+static constexpr auto& Logger = JaegerLogger;
+static constexpr auto& Profiler = TracingProfiler;
////////////////////////////////////////////////////////////////////////////////
@@ -50,7 +50,8 @@ static const TString TracingServiceAlias = "tracing";
void TJaegerTracerDynamicConfig::Register(TRegistrar registrar)
{
- registrar.Parameter("collector_channel_config", &TThis::CollectorChannelConfig)
+ registrar.Parameter("collector_channel", &TThis::CollectorChannel)
+ .Alias("collector_channel_config")
.Optional();
registrar.Parameter("max_request_size", &TThis::MaxRequestSize)
.Default();
@@ -110,8 +111,8 @@ TJaegerTracerConfigPtr TJaegerTracerConfig::ApplyDynamic(const TJaegerTracerDyna
{
auto config = New<TJaegerTracerConfig>();
config->CollectorChannelConfig = CollectorChannelConfig;
- if (dynamicConfig->CollectorChannelConfig) {
- config->CollectorChannelConfig = dynamicConfig->CollectorChannelConfig;
+ if (dynamicConfig->CollectorChannel) {
+ config->CollectorChannelConfig = dynamicConfig->CollectorChannel;
}
config->FlushPeriod = dynamicConfig->FlushPeriod.value_or(FlushPeriod);
@@ -231,9 +232,6 @@ std::vector<TK> ExtractKeys(THashMap<TK, TV> const& inputMap) {
////////////////////////////////////////////////////////////////////////////////
-TBatchInfo::TBatchInfo()
-{ }
-
TBatchInfo::TBatchInfo(const TString& endpoint)
: TracesDequeued_(Profiler().WithTag("endpoint", endpoint).Counter("/traces_dequeued"))
, TracesDropped_(Profiler().WithTag("endpoint", endpoint).Counter("/traces_dropped"))
@@ -383,14 +381,14 @@ TInstant TJaegerChannelManager::GetReopenTime()
}
TJaegerTracer::TJaegerTracer(
- const TJaegerTracerConfigPtr& config)
+ TJaegerTracerConfigPtr config)
: ActionQueue_(New<TActionQueue>("Jaeger"))
, FlushExecutor_(New<TPeriodicExecutor>(
ActionQueue_->GetInvoker(),
- BIND(&TJaegerTracer::Flush, MakeStrong(this)),
+ BIND(&TJaegerTracer::DoFlush, MakeStrong(this)),
config->FlushPeriod))
- , Config_(config)
, TvmService_(config->TvmService ? CreateTvmService(config->TvmService) : nullptr)
+ , Config_(std::move(config))
{
Profiler().AddFuncGauge("/enabled", MakeStrong(this), [this] {
return Config_.Acquire()->IsEnabled();
@@ -553,10 +551,11 @@ void TJaegerTracer::DropFullQueue()
}
}
-void TJaegerTracer::Flush()
+void TJaegerTracer::DoFlush()
{
YT_LOG_DEBUG("Started span flush");
+
auto config = Config_.Acquire();
auto flushStartTime = TInstant::Now();
@@ -564,7 +563,7 @@ void TJaegerTracer::Flush()
if (OpenChannelConfig_ != config->CollectorChannelConfig) {
OpenChannelConfig_ = config->CollectorChannelConfig;
for (auto& [endpoint, channel] : CollectorChannels_) {
- CollectorChannels_[endpoint].ForceReset(flushStartTime);
+ CollectorChannels_[endpoint]->ForceReset(flushStartTime);
}
}
@@ -582,9 +581,7 @@ void TJaegerTracer::Flush()
return;
}
- std::stack<TString> toRemove;
auto keys = ExtractKeys(BatchInfo_);
-
if (keys.empty()) {
YT_LOG_DEBUG("Span batch info is empty");
LastSuccessfulFlushTime_ = flushStartTime;
@@ -592,10 +589,11 @@ void TJaegerTracer::Flush()
return;
}
+ std::stack<TString> toRemove;
for (const auto& endpoint : keys) {
auto [batches, batchCount, spanCount] = PeekQueue(config, endpoint);
if (batchCount <= 0) {
- if (!CollectorChannels_.contains(endpoint) || flushStartTime > CollectorChannels_[endpoint].GetReopenTime() + config->EndpointChannelTimeout) {
+ if (!CollectorChannels_.contains(endpoint) || flushStartTime > CollectorChannels_[endpoint]->GetReopenTime() + config->EndpointChannelTimeout) {
toRemove.push(endpoint);
}
YT_LOG_DEBUG("Span queue is empty (Endpoint: %v)", endpoint);
@@ -614,16 +612,16 @@ void TJaegerTracer::Flush()
auto it = CollectorChannels_.find(endpoint);
if (it == CollectorChannels_.end()) {
- it = CollectorChannels_.emplace(endpoint, TJaegerChannelManager(config, endpoint, TvmService_)).first;
+ it = CollectorChannels_.emplace(endpoint, New<TJaegerChannelManager>(config, endpoint, TvmService_)).first;
}
auto& channel = it->second;
- if (channel.NeedsReopen(flushStartTime)) {
- channel = TJaegerChannelManager(config, endpoint, TvmService_);
+ if (channel->NeedsReopen(flushStartTime)) {
+ channel = New<TJaegerChannelManager>(config, endpoint, TvmService_);
}
- if (channel.Push(batches, spanCount)) {
+ if (channel->Push(batches, spanCount)) {
DropQueue(batchCount, endpoint);
YT_LOG_DEBUG("Spans sent (Endpoint: %v)", endpoint);
LastSuccessfulFlushTime_ = flushStartTime;
diff --git a/yt/yt/library/tracing/jaeger/tracer.h b/yt/yt/library/tracing/jaeger/tracer.h
index 02a71b3cac..ed4850b1e9 100644
--- a/yt/yt/library/tracing/jaeger/tracer.h
+++ b/yt/yt/library/tracing/jaeger/tracer.h
@@ -1,6 +1,6 @@
#pragma once
-#include "public.h"
+#include "private.h"
#include <yt/yt/library/tracing/tracer.h>
@@ -28,7 +28,7 @@ class TJaegerTracerDynamicConfig
: public NYTree::TYsonStruct
{
public:
- NRpc::NGrpc::TChannelConfigPtr CollectorChannelConfig;
+ NRpc::NGrpc::TChannelConfigPtr CollectorChannel;
std::optional<i64> MaxRequestSize;
@@ -98,12 +98,10 @@ DEFINE_REFCOUNTED_TYPE(TJaegerTracerConfig)
////////////////////////////////////////////////////////////////////////////////
-DECLARE_REFCOUNTED_CLASS(TJaegerTracer)
-
class TBatchInfo
{
public:
- TBatchInfo();
+ TBatchInfo() = default;
explicit TBatchInfo(const TString& endpoint);
void PopFront();
@@ -113,18 +111,21 @@ public:
std::tuple<std::vector<TSharedRef>, int, int> PeekQueue(const TJaegerTracerConfigPtr& config, std::optional<TSharedRef> processInfo);
private:
+ const NProfiling::TCounter TracesDequeued_;
+ const NProfiling::TCounter TracesDropped_;
+ const NProfiling::TGauge MemoryUsage_;
+ const NProfiling::TGauge TraceQueueSize_;
+
std::deque<std::pair<int, TSharedRef>> BatchQueue_;
i64 QueueMemory_ = 0;
i64 QueueSize_ = 0;
-
- NProfiling::TCounter TracesDequeued_;
- NProfiling::TCounter TracesDropped_;
- NProfiling::TGauge MemoryUsage_;
- NProfiling::TGauge TraceQueueSize_;
};
+////////////////////////////////////////////////////////////////////////////////
+
class TJaegerChannelManager
+ : public TRefCounted
{
public:
TJaegerChannelManager();
@@ -140,25 +141,29 @@ public:
TInstant GetReopenTime();
private:
- NRpc::IChannelPtr Channel_;
- NAuth::ITvmServicePtr TvmService_;
+ const NAuth::ITvmServicePtr TvmService_;
+ const TString Endpoint_;
- TString Endpoint_;
+ const TInstant ReopenTime_;
+ const TDuration RpcTimeout_;
- TInstant ReopenTime_;
- TDuration RpcTimeout_;
+ const NProfiling::TCounter PushedBytes_;
+ const NProfiling::TCounter PushErrors_;
+ const NProfiling::TSummary PayloadSize_;
+ const NProfiling::TEventTimer PushDuration_;
- NProfiling::TCounter PushedBytes_;
- NProfiling::TCounter PushErrors_;
- NProfiling::TSummary PayloadSize_;
- NProfiling::TEventTimer PushDuration_;
+ NRpc::IChannelPtr Channel_;
};
+DEFINE_REFCOUNTED_TYPE(TJaegerChannelManager)
+
+////////////////////////////////////////////////////////////////////////////////
+
class TJaegerTracer
: public ITracer
{
public:
- TJaegerTracer(const TJaegerTracerConfigPtr& config);
+ explicit TJaegerTracer(TJaegerTracerConfigPtr config);
TFuture<void> WaitFlush();
@@ -171,6 +176,7 @@ public:
private:
const NConcurrency::TActionQueuePtr ActionQueue_;
const NConcurrency::TPeriodicExecutorPtr FlushExecutor_;
+ const NAuth::ITvmServicePtr TvmService_;
TAtomicIntrusivePtr<TJaegerTracerConfig> Config_;
@@ -184,12 +190,11 @@ private:
TAtomicObject<TPromise<void>> QueueEmptyPromise_ = NewPromise<void>();
- THashMap<TString, TJaegerChannelManager> CollectorChannels_;
+ THashMap<TString, TJaegerChannelManagerPtr> CollectorChannels_;
NRpc::NGrpc::TChannelConfigPtr OpenChannelConfig_;
- NAuth::ITvmServicePtr TvmService_;
- void Flush();
+ void DoFlush();
void DequeueAll(const TJaegerTracerConfigPtr& config);
void NotifyEmptyQueue();