aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-06-11 19:37:16 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-06-12 11:17:53 +0300
commitf916f06348b500917c61c50b571aaa9ee095f982 (patch)
tree564309cd4f0eb1d9cbbe4aa037c4cdcbee8b7bfc
parent61a0cd4e405691bb1309ada13b972303daa24a62 (diff)
downloadydb-f916f06348b500917c61c50b571aaa9ee095f982.tar.gz
Intermediate changes
-rw-r--r--yt/yt/library/profiling/solomon/exporter.cpp67
-rw-r--r--yt/yt/library/profiling/solomon/helpers.cpp78
-rw-r--r--yt/yt/library/profiling/solomon/helpers.h32
-rw-r--r--yt/yt/library/profiling/solomon/private.h2
-rw-r--r--yt/yt/library/profiling/solomon/proxy.cpp428
-rw-r--r--yt/yt/library/profiling/solomon/proxy.h118
-rw-r--r--yt/yt/library/profiling/solomon/proxy_example/main.cpp80
-rw-r--r--yt/yt/library/profiling/solomon/proxy_example/ya.make11
-rw-r--r--yt/yt/library/profiling/solomon/public.h5
-rw-r--r--yt/yt/library/profiling/solomon/ya.make6
10 files changed, 774 insertions, 53 deletions
diff --git a/yt/yt/library/profiling/solomon/exporter.cpp b/yt/yt/library/profiling/solomon/exporter.cpp
index ec85275605..0bbaab7745 100644
--- a/yt/yt/library/profiling/solomon/exporter.cpp
+++ b/yt/yt/library/profiling/solomon/exporter.cpp
@@ -1,6 +1,7 @@
#include "exporter.h"
#include "private.h"
#include "sensor_service.h"
+#include "helpers.h"
#include <yt/yt/build/build.h>
@@ -620,47 +621,8 @@ void TSolomonExporter::DoHandleShard(
auto Logger = NProfiling::Logger().WithTag("Shard: %v", name);
try {
- auto format = NMonitoring::EFormat::JSON;
- if (auto accept = req->GetHeaders()->Find("Accept")) {
- format = NMonitoring::FormatFromAcceptHeader(*accept);
- }
-
- NMonitoring::ECompression compression = NMonitoring::ECompression::IDENTITY;
- if (auto acceptEncoding = req->GetHeaders()->Find("Accept-Encoding")) {
- compression = NMonitoring::CompressionFromAcceptEncodingHeader(*acceptEncoding);
- if (compression == NMonitoring::ECompression::UNKNOWN) {
- // Fallback to identity if we cannot recognize the requested encoding.
- compression = NMonitoring::ECompression::IDENTITY;
- }
- }
-
- auto buffer = std::make_shared<TStringStream>();
- NMonitoring::IMetricEncoderPtr encoder;
- switch (format) {
- case NMonitoring::EFormat::UNKNOWN:
- case NMonitoring::EFormat::JSON:
- encoder = NMonitoring::BufferedEncoderJson(buffer.get());
- format = NMonitoring::EFormat::JSON;
- compression = NMonitoring::ECompression::IDENTITY;
- break;
-
- case NMonitoring::EFormat::SPACK:
- encoder = NMonitoring::EncoderSpackV1(
- buffer.get(),
- NMonitoring::ETimePrecision::SECONDS,
- compression);
- break;
-
- case NMonitoring::EFormat::PROMETHEUS:
- encoder = NMonitoring::EncoderPrometheus(buffer.get());
- break;
-
- default:
- THROW_ERROR_EXCEPTION("Unsupported format %Qv", NMonitoring::ContentTypeByFormat(format));
- }
-
- rsp->GetHeaders()->Set("Content-Type", TString{NMonitoring::ContentTypeByFormat(format)});
- rsp->GetHeaders()->Set("Content-Encoding", TString{NMonitoring::ContentEncodingByCompression(compression)});
+ auto outputEncodingContext = CreateOutputEncodingContextFromHeaders(req->GetHeaders());
+ FillResponseHeaders(outputEncodingContext, rsp->GetHeaders());
TCgiParameters params(req->GetUrl().RawQuery);
@@ -712,8 +674,8 @@ void TSolomonExporter::DoHandleShard(
if (now && period) {
cacheKey = TCacheKey{
.Shard = name,
- .Format = format,
- .Compression = compression,
+ .Format = outputEncodingContext.Format,
+ .Compression = outputEncodingContext.Compression,
.Now = *now,
.Period = *period,
.Grid = readGridStep,
@@ -722,8 +684,8 @@ void TSolomonExporter::DoHandleShard(
auto solomonCluster = req->GetHeaders()->Find("X-Solomon-ClusterId");
YT_LOG_DEBUG("Processing sensor pull (Format: %v, Compression: %v, SolomonCluster: %v, Now: %v, Period: %v, Grid: %v)",
- format,
- compression,
+ outputEncodingContext.Format,
+ outputEncodingContext.Compression,
solomonCluster ? *solomonCluster : "",
now,
period,
@@ -800,8 +762,7 @@ void TSolomonExporter::DoHandleShard(
options.Host = Config_->Host;
options.InstanceTags = std::vector<TTag>{Config_->InstanceTags.begin(), Config_->InstanceTags.end()};
- auto isSolomon = format == NMonitoring::EFormat::JSON || format == NMonitoring::EFormat::SPACK;
- if (Config_->ConvertCountersToRateForSolomon && isSolomon) {
+ if (Config_->ConvertCountersToRateForSolomon && outputEncodingContext.IsSolomonPull) {
options.ConvertCountersToRateGauge = true;
options.RenameConvertedCounters = Config_->RenameConvertedCounters;
@@ -811,7 +772,7 @@ void TSolomonExporter::DoHandleShard(
}
}
- options.EnableSolomonAggregationWorkaround = isSolomon;
+ options.EnableSolomonAggregationWorkaround = outputEncodingContext.IsSolomonPull;
options.Times = readWindow;
options.SummaryPolicy = Config_->GetSummaryPolicy();
options.MarkAggregates = Config_->MarkAggregates;
@@ -837,14 +798,14 @@ void TSolomonExporter::DoHandleShard(
}
}
- encoder->OnStreamBegin();
- Registry_->ReadSensors(options, encoder.Get());
- encoder->OnStreamEnd();
+ outputEncodingContext.Encoder->OnStreamBegin();
+ Registry_->ReadSensors(options, outputEncodingContext.Encoder.Get());
+ outputEncodingContext.Encoder->OnStreamEnd();
guard->Release();
// NB(eshcherbin): Offload inner representation to binary/text format encoding (including compression).
- auto encodeFuture = BIND([buffer, encoder = std::move(encoder)] {
+ auto encodeFuture = BIND([buffer = outputEncodingContext.EncoderBuffer, encoder = std::move(outputEncodingContext.Encoder)] {
encoder->Close();
})
.AsyncVia(EncodingOffloadThreadPool_->GetInvoker())
@@ -854,7 +815,7 @@ void TSolomonExporter::DoHandleShard(
rsp->SetStatus(EStatusCode::OK);
- auto replyBlob = TSharedRef::FromString(buffer->Str());
+ auto replyBlob = TSharedRef::FromString(outputEncodingContext.EncoderBuffer->Str());
responsePromise.Set(replyBlob);
WaitFor(rsp->WriteBody(replyBlob))
diff --git a/yt/yt/library/profiling/solomon/helpers.cpp b/yt/yt/library/profiling/solomon/helpers.cpp
new file mode 100644
index 0000000000..4fe877e672
--- /dev/null
+++ b/yt/yt/library/profiling/solomon/helpers.cpp
@@ -0,0 +1,78 @@
+#include "helpers.h"
+#include "private.h"
+
+#include <yt/yt/core/http/http.h>
+
+#include <library/cpp/monlib/encode/json/json.h>
+#include <library/cpp/monlib/encode/spack/spack_v1.h>
+#include <library/cpp/monlib/encode/prometheus/prometheus.h>
+
+namespace NYT::NProfiling {
+
+using namespace NHttp;
+
+////////////////////////////////////////////////////////////////////////////////
+
+void FillResponseHeaders(const TOutputEncodingContext& outputEncodingContext, const THeadersPtr& headers)
+{
+ headers->Set("Content-Type", TString{::NMonitoring::ContentTypeByFormat(outputEncodingContext.Format)});
+ headers->Set("Content-Encoding", TString{::NMonitoring::ContentEncodingByCompression(outputEncodingContext.Compression)});
+}
+
+TOutputEncodingContext CreateOutputEncodingContextFromHeaders(const THeadersPtr& headers)
+{
+ TOutputEncodingContext context;
+
+ if (auto accept = headers->Find("Accept")) {
+ context.Format = ::NMonitoring::FormatFromAcceptHeader(*accept);
+ }
+
+ if (auto acceptEncoding = headers->Find("Accept-Encoding")) {
+ context.Compression = ::NMonitoring::CompressionFromAcceptEncodingHeader(*acceptEncoding);
+ if (context.Compression == ::NMonitoring::ECompression::UNKNOWN) {
+ // Fallback to identity if we cannot recognize the requested encoding.
+ context.Compression = ::NMonitoring::ECompression::IDENTITY;
+ }
+ }
+
+ if (auto isSolomonPull = headers->Find(IsSolomonPullHeaderName)) {
+ if (!TryFromString<bool>(*isSolomonPull, context.IsSolomonPull)) {
+ THROW_ERROR_EXCEPTION("Invalid value of %Qv header", IsSolomonPullHeaderName)
+ << TErrorAttribute("value", *isSolomonPull);
+ }
+ } else {
+ context.IsSolomonPull = context.Format == ::NMonitoring::EFormat::JSON || context.Format == ::NMonitoring::EFormat::SPACK;
+ }
+
+ context.EncoderBuffer = std::make_shared<TStringStream>();
+
+ switch (context.Format) {
+ case ::NMonitoring::EFormat::UNKNOWN:
+ case ::NMonitoring::EFormat::JSON:
+ context.Encoder = ::NMonitoring::BufferedEncoderJson(context.EncoderBuffer.get());
+ context.Format = ::NMonitoring::EFormat::JSON;
+ context.Compression = ::NMonitoring::ECompression::IDENTITY;
+ break;
+
+ case ::NMonitoring::EFormat::SPACK:
+ context.Encoder = ::NMonitoring::EncoderSpackV1(
+ context.EncoderBuffer.get(),
+ ::NMonitoring::ETimePrecision::SECONDS,
+ context.Compression);
+ break;
+
+ case ::NMonitoring::EFormat::PROMETHEUS:
+ context.Encoder = ::NMonitoring::EncoderPrometheus(context.EncoderBuffer.get());
+ context.Compression = ::NMonitoring::ECompression::IDENTITY;
+ break;
+
+ default:
+ THROW_ERROR_EXCEPTION("Unsupported format %Qv", ::NMonitoring::ContentTypeByFormat(context.Format));
+ }
+
+ return context;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NProfiling
diff --git a/yt/yt/library/profiling/solomon/helpers.h b/yt/yt/library/profiling/solomon/helpers.h
new file mode 100644
index 0000000000..4713fd20b7
--- /dev/null
+++ b/yt/yt/library/profiling/solomon/helpers.h
@@ -0,0 +1,32 @@
+#pragma once
+
+#include <yt/yt/core/http/public.h>
+
+#include <library/cpp/monlib/encode/format.h>
+#include <library/cpp/monlib/encode/encoder.h>
+
+namespace NYT::NProfiling {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TOutputEncodingContext
+{
+ ::NMonitoring::EFormat Format = ::NMonitoring::EFormat::JSON;
+ //! JSON and SPACK are considered solomon formats, unless this value is explicitly overriden in headers.
+ bool IsSolomonPull = true;
+ ::NMonitoring::ECompression Compression = ::NMonitoring::ECompression::IDENTITY;
+
+ //! Buffer to be filled by encoder below.
+ std::shared_ptr<TStringStream> EncoderBuffer;
+ ::NMonitoring::IMetricEncoderPtr Encoder;
+};
+
+//! Fills content type related headers according to format and compression.
+void FillResponseHeaders(const TOutputEncodingContext& outputEncodingContext, const NHttp::THeadersPtr& headers);
+
+//! Creates output encoder according to request headers.
+TOutputEncodingContext CreateOutputEncodingContextFromHeaders(const NHttp::THeadersPtr& headers);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NProfiling
diff --git a/yt/yt/library/profiling/solomon/private.h b/yt/yt/library/profiling/solomon/private.h
index c0e53ab680..334287f8d5 100644
--- a/yt/yt/library/profiling/solomon/private.h
+++ b/yt/yt/library/profiling/solomon/private.h
@@ -10,6 +10,8 @@ YT_DEFINE_GLOBAL(const NLogging::TLogger, SolomonLogger, "Solomon");
inline const int DefaultProducerCollectionBatchSize = 100;
+inline static const TString IsSolomonPullHeaderName = "X-YT-IsSolomonPull";
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NProfiling
diff --git a/yt/yt/library/profiling/solomon/proxy.cpp b/yt/yt/library/profiling/solomon/proxy.cpp
new file mode 100644
index 0000000000..b2e8256501
--- /dev/null
+++ b/yt/yt/library/profiling/solomon/proxy.cpp
@@ -0,0 +1,428 @@
+#include "proxy.h"
+#include "private.h"
+
+#include <yt/yt/core/http/http.h>
+#include <yt/yt/core/http/server.h>
+#include <yt/yt/core/http/client.h>
+#include <yt/yt/core/http/helpers.h>
+
+#include <yt/yt/core/logging/log.h>
+
+#include <yt/yt/core/ytree/fluent.h>
+
+#include <library/cpp/monlib/encode/json/json.h>
+#include <library/cpp/monlib/encode/spack/spack_v1.h>
+#include <library/cpp/monlib/encode/prometheus/prometheus.h>
+
+namespace NYT::NProfiling {
+
+using namespace NConcurrency;
+using namespace NHttp;
+using namespace NYTree;
+using namespace NLogging;
+using namespace NTracing;
+
+////////////////////////////////////////////////////////////////////////////////
+
+const static auto& Logger = SolomonLogger;
+
+////////////////////////////////////////////////////////////////////////////////
+
+static const TString ShardIndexParameterName("shard_index");
+static const TString ShardCountParameterName("shard_count");
+static const TString ComponentParameterName("component");
+static const TString InstanceParameterName("instance");
+
+static const TString InstanceLabelParameterNamePrefix("instance_");
+
+static const std::vector<TString> ForwardParameterWhitelist = {
+ "period",
+ "now",
+};
+
+static const std::vector<TString> ForwardHeaderWhitelist = {
+ "X-Solomon-GridSec",
+ "X-Solomon-ClusterId",
+};
+
+static constexpr int PullErrorSampleSize = 20;
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+int ParseIntegerParameter(const TCgiParameters& parameters, const TString& parameterName, int defaultValue)
+{
+ int value = defaultValue;
+
+ if (auto it = parameters.Find(parameterName); it != parameters.end()) {
+ if (!TryFromString<int>(it->second, value)) {
+ THROW_ERROR_EXCEPTION("Invalid value of %Qv parameter", parameterName)
+ << TErrorAttribute("value", it->second);
+ }
+ }
+
+ return value;
+}
+
+std::optional<TString> TrimPrefix(const TString& name, const TString& prefix)
+{
+ if (name.StartsWith(prefix)) {
+ return name.substr(prefix.size());
+ }
+
+ return {};
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TSolomonProxyConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("public_component_names", &TThis::PublicComponentNames)
+ .Default();
+ registrar.Parameter("max_endpoints_per_request", &TThis::MaxEndpointsPerRequest)
+ .Default(30);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TSolomonProxy::TSolomonProxy(TSolomonProxyConfigPtr config, IPollerPtr poller)
+ : Config_(std::move(config))
+ , HttpClient_(CreateClient(New<TClientConfig>(), std::move(poller)))
+{ }
+
+void TSolomonProxy::RegisterEndpointProvider(const IEndpointProviderPtr& endpointProvider)
+{
+ EmplaceOrCrash(ComponentNameToEndpointProvider_, endpointProvider->GetComponentName(), endpointProvider);
+}
+
+void TSolomonProxy::UnregisterEndpointProvider(const IEndpointProviderPtr& endpointProvider)
+{
+ EraseOrCrash(ComponentNameToEndpointProvider_, endpointProvider->GetComponentName());
+}
+
+void TSolomonProxy::Register(const TString& prefix, const IServerPtr& server)
+{
+ server->AddHandler(prefix + "/sensors", BIND(&TSolomonProxy::HandleSensors, MakeStrong(this)));
+ // TODO(achulkov2): Expose available tags/targets?
+}
+
+void TSolomonProxy::ValidateShardingParameters(int shardIndex, int shardCount)
+{
+ if (!(0 <= shardIndex && shardIndex < shardCount)) {
+ THROW_ERROR_EXCEPTION(
+ "Invalid sharding configuration, %v=%v must be in range [0, %v=%v)",
+ ShardIndexParameterName,
+ shardIndex,
+ ShardCountParameterName,
+ shardCount);
+ }
+}
+
+THeadersPtr TSolomonProxy::PreparePullHeaders(const THeadersPtr& reqHeaders, const TOutputEncodingContext& outputContext)
+{
+ auto pullHeaders = New<THeaders>();
+ // We use this format since it is the only one that supports compression.
+ pullHeaders->Set("Accept", TString(::NMonitoring::ContentTypeByFormat(::NMonitoring::EFormat::SPACK)));
+ pullHeaders->Set("Accept-Encoding", TString(::NMonitoring::ContentEncodingByCompression(::NMonitoring::ECompression::ZSTD)));
+ // We use a separate header to propagate whether or not this was a "solomon" pull. It affects counter-to-rate conversion.
+ pullHeaders->Set(IsSolomonPullHeaderName, outputContext.IsSolomonPull ? "1" : "0");
+
+ for (const auto& forwardHeader : ForwardHeaderWhitelist) {
+ if (auto value = reqHeaders->Find(forwardHeader)) {
+ pullHeaders->Set(forwardHeader, *value);
+ }
+ }
+
+ return pullHeaders;
+}
+
+TCgiParameters TSolomonProxy::PreparePullParameters(const TCgiParameters& parameters)
+{
+ TCgiParameters pullParameters;
+ for (const auto& forwardParamater : ForwardParameterWhitelist) {
+ if (auto valueIt = parameters.find(forwardParamater); valueIt != parameters.end()) {
+ pullParameters.insert(*valueIt);
+ }
+ }
+
+ return pullParameters;
+}
+
+TSolomonProxy::TComponentMatcher TSolomonProxy::GetComponentMatcher(const TCgiParameters& parameters)
+{
+ TComponentMatcher componentMatcher;
+
+ if (parameters.Has(ComponentParameterName)) {
+ componentMatcher.Components.emplace();
+ }
+
+ for (const auto& componentName : parameters.Range(ComponentParameterName)) {
+ componentMatcher.Components->insert(componentName);
+ }
+
+ return componentMatcher;
+}
+
+bool TSolomonProxy::MatchComponent(const TComponentMatcher& componentMatcher, const IEndpointProviderPtr& endpointProvider)
+{
+ return !componentMatcher.Components || componentMatcher.Components->contains(endpointProvider->GetComponentName());
+}
+
+TSolomonProxy::TInstanceFilter TSolomonProxy::GetInstanceFilter(const TCgiParameters& parameters)
+{
+ TInstanceFilter instanceFilter;
+
+ for (const auto& [name, value] : parameters) {
+ if (name == InstanceParameterName) {
+ if (!instanceFilter.Instances) {
+ instanceFilter.Instances.emplace();
+ }
+
+ instanceFilter.Instances->insert(value);
+ } else if (auto instanceParameterName = TrimPrefix(name, InstanceLabelParameterNamePrefix)) {
+ instanceFilter.LabelFilter[*instanceParameterName].insert(value);
+ }
+ }
+
+ return instanceFilter;
+}
+
+std::vector<IEndpointProvider::TEndpoint> TSolomonProxy::FilterInstances(
+ const TInstanceFilter& instanceFilter,
+ const std::vector<IEndpointProvider::TEndpoint>& endpoints)
+{
+ std::vector<IEndpointProvider::TEndpoint> filteredEndpoints;
+ filteredEndpoints.reserve(endpoints.size());
+
+ for (const auto& endpoint : endpoints) {
+ // Skip non-selected instances if instance names are specified.
+ if (instanceFilter.Instances && !instanceFilter.Instances->contains(endpoint.Name)) {
+ continue;
+ }
+
+ // For all specified labels, skip instances that do not have the selected values of these labels (or any value).
+ bool matchesLabelFilter = true;
+ for (const auto& [label, acceptableValues] : instanceFilter.LabelFilter) {
+ auto instanceLabelValueIt = endpoint.Labels.find(label);
+ if (instanceLabelValueIt == endpoint.Labels.end() || !acceptableValues.contains(instanceLabelValueIt->second)) {
+ matchesLabelFilter = false;
+ break;
+ }
+ }
+ if (!matchesLabelFilter) {
+ continue;
+ }
+
+ filteredEndpoints.push_back(endpoint);
+ }
+
+ return filteredEndpoints;
+}
+
+std::vector<TString> TSolomonProxy::CollectEndpoints(const TCgiParameters& parameters, int shardIndex, int shardCount) const
+{
+ auto componentMatcher = GetComponentMatcher(parameters);
+ auto instanceFilter = GetInstanceFilter(parameters);
+
+ YT_LOG_DEBUG(
+ "Selecting endpoints (PublicComponentNames: %v, RequestedComponents: %v, RequestedInstances: %v, InstanceLabelFilter: %v)",
+ Config_->PublicComponentNames,
+ componentMatcher.Components,
+ instanceFilter.Instances,
+ instanceFilter.LabelFilter);
+
+ int selectedComponents = 0;
+ int skippedByInternalFilter = 0;
+ int skippedByUserFilter = 0;
+
+ std::vector<TString> allEndpoints;
+
+ for (const auto& [componentName, endpointProvider] : ComponentNameToEndpointProvider_) {
+ YT_VERIFY(componentName == endpointProvider->GetComponentName());
+
+ if (Config_->PublicComponentNames && !Config_->PublicComponentNames->contains(componentName)) {
+ ++skippedByInternalFilter;
+ YT_LOG_DEBUG("Skipping non-public component (ComponentName: %v)", componentName);
+ continue;
+ }
+
+ if (!MatchComponent(componentMatcher, endpointProvider)) {
+ ++skippedByUserFilter;
+ YT_LOG_DEBUG("Skipping non-requested component (ComponentName: %v)", componentName);
+ continue;
+ }
+
+ ++selectedComponents;
+
+ auto componentEndpoints = endpointProvider->GetEndpoints();
+ auto selectedEndpointCountBefore = allEndpoints.size();
+ auto filteredInstances = FilterInstances(instanceFilter, componentEndpoints);
+ for (const auto& endpoint : filteredInstances) {
+ if (FarmFingerprint(endpoint.Address) % shardCount == shardIndex) {
+ allEndpoints.push_back(endpoint.Address);
+ }
+ }
+
+ // For debug purposes.
+ int shardEndpointCount = 0;
+ for (const auto& endpoint : componentEndpoints) {
+ if (FarmFingerprint(endpoint.Address) % shardCount == shardIndex) {
+ ++shardEndpointCount;
+ }
+ }
+
+ YT_LOG_DEBUG(
+ "Selected component endpoints for pull (ComponentName: %v, SelectedEndpointCount: %v, TotalEndpointCount: %v, InstanceFilterMatchCount: %v, ShardEndpointCount: %v)",
+ componentName,
+ allEndpoints.size() - selectedEndpointCountBefore,
+ componentEndpoints.size(),
+ filteredInstances.size(),
+ shardEndpointCount);
+ }
+
+ YT_LOG_DEBUG(
+ "Collected endpoints from components (EndpointCount: %v, ComponentCount: %v, ComponentSkippedByInternalFilterCount: %v, ComponentSkippedByUserFilterCount: %v)",
+ allEndpoints.size(),
+ selectedComponents,
+ skippedByInternalFilter,
+ skippedByUserFilter);
+
+ return allEndpoints;
+}
+
+void TSolomonProxy::HandleSensors(const IRequestPtr& req, const IResponseWriterPtr& rsp)
+{
+ try {
+ GuardedHandleSensors(req, rsp);
+ } catch(const std::exception& ex) {
+ YT_LOG_DEBUG(ex, "Failed to pull sensors from endpoints");
+
+ if (!rsp->AreHeadersFlushed()) {
+ try {
+ rsp->SetStatus(EStatusCode::InternalServerError);
+ rsp->GetHeaders()->Remove("Content-Type");
+ rsp->GetHeaders()->Remove("Content-Encoding");
+
+ ReplyError(rsp, ex);
+ } catch (const std::exception& ex) {
+ YT_LOG_DEBUG(ex, "Failed to send sensor pull error");
+ }
+ }
+ }
+}
+
+void TSolomonProxy::GuardedHandleSensors(const IRequestPtr& req, const IResponseWriterPtr& rsp)
+{
+ auto reqUrlRef = req->GetUrl();
+ TCgiParameters parameters(reqUrlRef.RawQuery);
+
+ auto shardIndex = ParseIntegerParameter(parameters, ShardIndexParameterName, /*defaultValue*/ 0);
+ auto shardCount = ParseIntegerParameter(parameters, ShardCountParameterName, /*defaultValue*/ 1);
+
+ YT_LOG_DEBUG(
+ "Performing solomon proxy fetch (ShardIndex: %v, ShardCount: %v, Query: %v)",
+ shardIndex,
+ shardCount,
+ reqUrlRef.RawQuery);
+
+ ValidateShardingParameters(shardIndex, shardCount);
+
+ auto outputEncodingContext = CreateOutputEncodingContextFromHeaders(req->GetHeaders());
+
+ auto filteredEndpoints = CollectEndpoints(parameters, shardIndex, shardCount);
+
+ // TODO(achulkov2): Ideally, we would want some sort of memory accounting here.
+ if (std::ssize(filteredEndpoints) > Config_->MaxEndpointsPerRequest) {
+ THROW_ERROR_EXCEPTION("Cannot pull sensors from %v endpoints at once, please retry the request with a larger shard count", filteredEndpoints.size())
+ << TErrorAttribute("max_endpoints_per_request", Config_->MaxEndpointsPerRequest);
+ }
+
+ std::vector<TFuture<IResponsePtr>> asyncPullResponses;
+ asyncPullResponses.reserve(filteredEndpoints.size());
+
+ auto pullHeaders = PreparePullHeaders(req->GetHeaders(), outputEncodingContext);
+ auto pullParameters = PreparePullParameters(parameters);
+
+ YT_LOG_DEBUG(
+ "Proxying pull requests to hosts (EndpointCount: %v, Parameters: %v, Headers: %v)",
+ filteredEndpoints.size(),
+ // This is easier to read than the escaped string.
+ std::vector<std::pair<TString, TString>>(pullParameters.begin(), pullParameters.end()),
+ pullHeaders->Dump());
+
+ for (auto pullUrl : filteredEndpoints) {
+ if (!pullParameters.empty()) {
+ pullUrl += Format("?%v", pullParameters.Print());
+ }
+
+ asyncPullResponses.push_back(HttpClient_->Get(pullUrl, pullHeaders));
+ }
+
+ auto pullResponseOrErrors = WaitFor(AllSet(asyncPullResponses))
+ .ValueOrThrow();
+
+ std::vector<TError> pullErrors;
+
+ for (const auto& pullResponseOrError : pullResponseOrErrors) {
+ if (!pullResponseOrError.IsOK()) {
+ YT_LOG_DEBUG(pullResponseOrError, "Error while pulling sensors from endpoint");
+ pullErrors.push_back(pullResponseOrError);
+ continue;
+ }
+
+ const auto& pullResponse = pullResponseOrError.Value();
+ auto body = pullResponse->ReadAll();
+
+ if (pullResponse->GetStatusCode() != EStatusCode::OK) {
+ YT_LOG_DEBUG("Sensor pull failed (StatusCode: %v, Response: %v)", pullResponse->GetStatusCode(), body);
+ pullErrors.push_back(
+ TError("Sensor pull failed with status code %v", pullResponse->GetStatusCode())
+ << TErrorAttribute("response_body", ToString(body))
+ << TErrorAttribute("status_code", pullResponse->GetStatusCode()));
+ continue;
+ }
+
+ TMemoryInput stream(body.Begin(), body.Size());
+ ::NMonitoring::DecodeSpackV1(&stream, outputEncodingContext.Encoder.Get());
+ }
+
+ YT_LOG_DEBUG(
+ "Pulled sensors from endpoints (RequestCount: %v, SuccessCount: %v, FailureCount: %v)",
+ asyncPullResponses.size(),
+ asyncPullResponses.size() - pullErrors.size(),
+ pullErrors.size());
+
+ // TODO(achulkov2): Think about more introspection of pull failures for other cases.
+ if (!pullErrors.empty() && pullErrors.size() == asyncPullResponses.size()) {
+ auto totalErrorCount = pullErrors.size();
+ if (std::ssize(pullErrors) > PullErrorSampleSize) {
+ pullErrors.resize(PullErrorSampleSize);
+ }
+ THROW_ERROR_EXCEPTION("Could not pull sensors from any endpoint")
+ << TErrorAttribute("sampled_error_count", pullErrors.size())
+ << TErrorAttribute("total_error_count", totalErrorCount)
+ << pullErrors;
+ }
+
+ // TODO(achulkov2): Maybe offload this to a separate thread pool like it is done in the exporter.
+ outputEncodingContext.Encoder->Close();
+
+ rsp->SetStatus(EStatusCode::OK);
+ FillResponseHeaders(outputEncodingContext, rsp->GetHeaders());
+
+ auto replyBlob = TSharedRef::FromString(outputEncodingContext.EncoderBuffer->Str());
+
+ WaitFor(rsp->WriteBody(replyBlob))
+ .ThrowOnError();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NProfiling
diff --git a/yt/yt/library/profiling/solomon/proxy.h b/yt/yt/library/profiling/solomon/proxy.h
new file mode 100644
index 0000000000..937b098461
--- /dev/null
+++ b/yt/yt/library/profiling/solomon/proxy.h
@@ -0,0 +1,118 @@
+#pragma once
+
+#include "public.h"
+#include "helpers.h"
+
+#include <yt/yt/core/http/public.h>
+
+#include <yt/yt/core/concurrency/public.h>
+
+#include <yt/yt/core/ytree/yson_struct.h>
+
+#include <library/cpp/yt/farmhash/farm_hash.h>
+
+#include <library/cpp/cgiparam/cgiparam.h>
+
+namespace NYT::NProfiling {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IEndpointProvider
+ : public TRefCounted
+{
+public:
+ struct TEndpoint
+ {
+ //! Logical name of the endpoint.
+ TString Name;
+ //! Actual address to pull sensors from.
+ TString Address;
+ //! A set of labels which can be filtered upon.
+ THashMap<TString, TString> Labels;
+ };
+
+ //! Human-readable component name.
+ virtual TString GetComponentName() const = 0;
+ //! This method is allowed to return a cached response.
+ virtual std::vector<TEndpoint> GetEndpoints() const = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IEndpointProvider)
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TSolomonProxyConfig
+ : public NYTree::TYsonStruct
+{
+ //! A list of publicly available components.
+ //! Endpoints from other components are not pulled from.
+ //! Currently, this filter is applied for all requests.
+ //! TODO(achulkov2): Add some form of access control to allow bypassing this check.
+ std::optional<THashSet<TString>> PublicComponentNames;
+
+ //! Forbids pulling sensors from too many endpoints within a single request.
+ //! Instead of increasing this limit, one should make requests with a larger shard count.
+ int MaxEndpointsPerRequest;
+
+ REGISTER_YSON_STRUCT(TSolomonProxyConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TSolomonProxyConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSolomonProxy
+ : public TRefCounted
+{
+public:
+ TSolomonProxy(TSolomonProxyConfigPtr config, NConcurrency::IPollerPtr poller);
+
+ void RegisterEndpointProvider(const IEndpointProviderPtr& endpointProvider);
+ void UnregisterEndpointProvider(const IEndpointProviderPtr& endpointProvider);
+
+ void Register(const TString& prefix, const NHttp::IServerPtr& server);
+
+private:
+ const TSolomonProxyConfigPtr Config_;
+ const NHttp::IClientPtr HttpClient_;
+ THashMap<TString, IEndpointProviderPtr> ComponentNameToEndpointProvider_;
+
+ void HandleSensors(const NHttp::IRequestPtr& req, const NHttp::IResponseWriterPtr& rsp);
+ void GuardedHandleSensors(const NHttp::IRequestPtr& req, const NHttp::IResponseWriterPtr& rsp);
+
+ static void ValidateShardingParameters(int shardIndex, int shardCount);
+
+ struct TComponentMatcher
+ {
+ std::optional<THashSet<TString>> Components;
+ };
+
+ static TComponentMatcher GetComponentMatcher(const TCgiParameters& parameters);
+ static bool MatchComponent(const TComponentMatcher& componentMatcher, const IEndpointProviderPtr& endpointProvider);
+
+ struct TInstanceFilter
+ {
+ std::optional<THashSet<TString>> Instances;
+ THashMap<TString, THashSet<TString>> LabelFilter;
+ };
+
+ static TInstanceFilter GetInstanceFilter(const TCgiParameters& parameters);
+ static std::vector<IEndpointProvider::TEndpoint> FilterInstances(
+ const TInstanceFilter& instanceFilter,
+ const std::vector<IEndpointProvider::TEndpoint>& endpoints);
+
+ std::vector<TString> CollectEndpoints(const TCgiParameters& parameters, int shardIndex, int shardCount) const;
+
+ //! For safety we only proxy whitelisted headers.
+ static NHttp::THeadersPtr PreparePullHeaders(const NHttp::THeadersPtr& reqHeaders, const TOutputEncodingContext& outputContext);
+ //! For safety we only proxy whitelisted parameters.
+ static TCgiParameters PreparePullParameters(const TCgiParameters& parameters);
+};
+
+DEFINE_REFCOUNTED_TYPE(TSolomonProxy)
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NProfiling
diff --git a/yt/yt/library/profiling/solomon/proxy_example/main.cpp b/yt/yt/library/profiling/solomon/proxy_example/main.cpp
new file mode 100644
index 0000000000..5f01397780
--- /dev/null
+++ b/yt/yt/library/profiling/solomon/proxy_example/main.cpp
@@ -0,0 +1,80 @@
+#include <yt/yt/library/profiling/solomon/proxy.h>
+
+#include <yt/yt/core/concurrency/poller.h>
+#include <yt/yt/core/concurrency/thread_pool.h>
+#include <yt/yt/core/concurrency/thread_pool_poller.h>
+
+#include <yt/yt/core/http/server.h>
+
+#include <util/generic/yexception.h>
+
+using namespace NYT;
+using namespace NYT::NHttp;
+using namespace NYT::NConcurrency;
+using namespace NYT::NProfiling;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TStaticEndpointProvider
+ : public IEndpointProvider
+{
+public:
+ TStaticEndpointProvider(std::vector<TString> addresses)
+ : Addresses_(std::move(addresses))
+ { }
+
+ TString GetComponentName() const override
+ {
+ return "static";
+ }
+
+ std::vector<TEndpoint> GetEndpoints() const override
+ {
+ std::vector<TEndpoint> endpoints;
+ for (const auto& address : Addresses_) {
+ endpoints.push_back({.Address = address});
+ }
+
+ return endpoints;
+ }
+
+private:
+ std::vector<TString> Addresses_;
+};
+
+int main(int argc, char* argv[])
+{
+ try {
+ if (argc != 2) {
+ throw yexception() << "usage: " << argv[0] << " PORT";
+ }
+
+ auto port = FromString<int>(argv[1]);
+ auto poller = CreateThreadPoolPoller(1, "Example");
+ auto server = CreateServer(port, poller);
+
+ auto proxy = New<TSolomonProxy>(New<TSolomonProxyConfig>(), poller);
+
+ std::vector<TString> addresses = {
+ "list",
+ "your",
+ "hosts",
+ "here"
+ };
+ auto staticEndpointProvider = New<TStaticEndpointProvider>(addresses);
+ proxy->RegisterEndpointProvider(staticEndpointProvider);
+
+ proxy->Register("/solomon", server);
+
+ server->Start();
+
+ TDelayedExecutor::WaitForDuration(TDuration::Max());
+ } catch (const std::exception& ex) {
+ Cerr << ex.what() << Endl;
+ _exit(1);
+ }
+
+ return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/library/profiling/solomon/proxy_example/ya.make b/yt/yt/library/profiling/solomon/proxy_example/ya.make
new file mode 100644
index 0000000000..ec8e898f5d
--- /dev/null
+++ b/yt/yt/library/profiling/solomon/proxy_example/ya.make
@@ -0,0 +1,11 @@
+PROGRAM(proxy-example)
+
+INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
+
+SRCS(main.cpp)
+
+PEERDIR(
+ yt/yt/library/profiling/solomon
+)
+
+END()
diff --git a/yt/yt/library/profiling/solomon/public.h b/yt/yt/library/profiling/solomon/public.h
index 02bf5ad2c9..d497d9f9b9 100644
--- a/yt/yt/library/profiling/solomon/public.h
+++ b/yt/yt/library/profiling/solomon/public.h
@@ -8,8 +8,13 @@ namespace NYT::NProfiling {
DECLARE_REFCOUNTED_STRUCT(TShardConfig)
DECLARE_REFCOUNTED_STRUCT(TSolomonExporterConfig)
+DECLARE_REFCOUNTED_STRUCT(TSolomonProxyConfig)
+
DECLARE_REFCOUNTED_CLASS(TSolomonExporter)
DECLARE_REFCOUNTED_CLASS(TSolomonRegistry)
+DECLARE_REFCOUNTED_CLASS(TSolomonProxy)
+
+DECLARE_REFCOUNTED_STRUCT(IEndpointProvider)
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/library/profiling/solomon/ya.make b/yt/yt/library/profiling/solomon/ya.make
index 4b20516ed2..d578cc3e76 100644
--- a/yt/yt/library/profiling/solomon/ya.make
+++ b/yt/yt/library/profiling/solomon/ya.make
@@ -5,8 +5,10 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
SRCS(
cube.cpp
exporter.cpp
+ helpers.cpp
percpu.cpp
producer.cpp
+ proxy.cpp
registry.cpp
remote.cpp
sensor.cpp
@@ -31,3 +33,7 @@ PEERDIR(
)
END()
+
+RECURSE(
+ proxy_example
+)