diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-06-11 19:37:16 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-06-12 11:17:53 +0300 |
commit | f916f06348b500917c61c50b571aaa9ee095f982 (patch) | |
tree | 564309cd4f0eb1d9cbbe4aa037c4cdcbee8b7bfc | |
parent | 61a0cd4e405691bb1309ada13b972303daa24a62 (diff) | |
download | ydb-f916f06348b500917c61c50b571aaa9ee095f982.tar.gz |
Intermediate changes
-rw-r--r-- | yt/yt/library/profiling/solomon/exporter.cpp | 67 | ||||
-rw-r--r-- | yt/yt/library/profiling/solomon/helpers.cpp | 78 | ||||
-rw-r--r-- | yt/yt/library/profiling/solomon/helpers.h | 32 | ||||
-rw-r--r-- | yt/yt/library/profiling/solomon/private.h | 2 | ||||
-rw-r--r-- | yt/yt/library/profiling/solomon/proxy.cpp | 428 | ||||
-rw-r--r-- | yt/yt/library/profiling/solomon/proxy.h | 118 | ||||
-rw-r--r-- | yt/yt/library/profiling/solomon/proxy_example/main.cpp | 80 | ||||
-rw-r--r-- | yt/yt/library/profiling/solomon/proxy_example/ya.make | 11 | ||||
-rw-r--r-- | yt/yt/library/profiling/solomon/public.h | 5 | ||||
-rw-r--r-- | yt/yt/library/profiling/solomon/ya.make | 6 |
10 files changed, 774 insertions, 53 deletions
diff --git a/yt/yt/library/profiling/solomon/exporter.cpp b/yt/yt/library/profiling/solomon/exporter.cpp index ec85275605..0bbaab7745 100644 --- a/yt/yt/library/profiling/solomon/exporter.cpp +++ b/yt/yt/library/profiling/solomon/exporter.cpp @@ -1,6 +1,7 @@ #include "exporter.h" #include "private.h" #include "sensor_service.h" +#include "helpers.h" #include <yt/yt/build/build.h> @@ -620,47 +621,8 @@ void TSolomonExporter::DoHandleShard( auto Logger = NProfiling::Logger().WithTag("Shard: %v", name); try { - auto format = NMonitoring::EFormat::JSON; - if (auto accept = req->GetHeaders()->Find("Accept")) { - format = NMonitoring::FormatFromAcceptHeader(*accept); - } - - NMonitoring::ECompression compression = NMonitoring::ECompression::IDENTITY; - if (auto acceptEncoding = req->GetHeaders()->Find("Accept-Encoding")) { - compression = NMonitoring::CompressionFromAcceptEncodingHeader(*acceptEncoding); - if (compression == NMonitoring::ECompression::UNKNOWN) { - // Fallback to identity if we cannot recognize the requested encoding. - compression = NMonitoring::ECompression::IDENTITY; - } - } - - auto buffer = std::make_shared<TStringStream>(); - NMonitoring::IMetricEncoderPtr encoder; - switch (format) { - case NMonitoring::EFormat::UNKNOWN: - case NMonitoring::EFormat::JSON: - encoder = NMonitoring::BufferedEncoderJson(buffer.get()); - format = NMonitoring::EFormat::JSON; - compression = NMonitoring::ECompression::IDENTITY; - break; - - case NMonitoring::EFormat::SPACK: - encoder = NMonitoring::EncoderSpackV1( - buffer.get(), - NMonitoring::ETimePrecision::SECONDS, - compression); - break; - - case NMonitoring::EFormat::PROMETHEUS: - encoder = NMonitoring::EncoderPrometheus(buffer.get()); - break; - - default: - THROW_ERROR_EXCEPTION("Unsupported format %Qv", NMonitoring::ContentTypeByFormat(format)); - } - - rsp->GetHeaders()->Set("Content-Type", TString{NMonitoring::ContentTypeByFormat(format)}); - rsp->GetHeaders()->Set("Content-Encoding", TString{NMonitoring::ContentEncodingByCompression(compression)}); + auto outputEncodingContext = CreateOutputEncodingContextFromHeaders(req->GetHeaders()); + FillResponseHeaders(outputEncodingContext, rsp->GetHeaders()); TCgiParameters params(req->GetUrl().RawQuery); @@ -712,8 +674,8 @@ void TSolomonExporter::DoHandleShard( if (now && period) { cacheKey = TCacheKey{ .Shard = name, - .Format = format, - .Compression = compression, + .Format = outputEncodingContext.Format, + .Compression = outputEncodingContext.Compression, .Now = *now, .Period = *period, .Grid = readGridStep, @@ -722,8 +684,8 @@ void TSolomonExporter::DoHandleShard( auto solomonCluster = req->GetHeaders()->Find("X-Solomon-ClusterId"); YT_LOG_DEBUG("Processing sensor pull (Format: %v, Compression: %v, SolomonCluster: %v, Now: %v, Period: %v, Grid: %v)", - format, - compression, + outputEncodingContext.Format, + outputEncodingContext.Compression, solomonCluster ? *solomonCluster : "", now, period, @@ -800,8 +762,7 @@ void TSolomonExporter::DoHandleShard( options.Host = Config_->Host; options.InstanceTags = std::vector<TTag>{Config_->InstanceTags.begin(), Config_->InstanceTags.end()}; - auto isSolomon = format == NMonitoring::EFormat::JSON || format == NMonitoring::EFormat::SPACK; - if (Config_->ConvertCountersToRateForSolomon && isSolomon) { + if (Config_->ConvertCountersToRateForSolomon && outputEncodingContext.IsSolomonPull) { options.ConvertCountersToRateGauge = true; options.RenameConvertedCounters = Config_->RenameConvertedCounters; @@ -811,7 +772,7 @@ void TSolomonExporter::DoHandleShard( } } - options.EnableSolomonAggregationWorkaround = isSolomon; + options.EnableSolomonAggregationWorkaround = outputEncodingContext.IsSolomonPull; options.Times = readWindow; options.SummaryPolicy = Config_->GetSummaryPolicy(); options.MarkAggregates = Config_->MarkAggregates; @@ -837,14 +798,14 @@ void TSolomonExporter::DoHandleShard( } } - encoder->OnStreamBegin(); - Registry_->ReadSensors(options, encoder.Get()); - encoder->OnStreamEnd(); + outputEncodingContext.Encoder->OnStreamBegin(); + Registry_->ReadSensors(options, outputEncodingContext.Encoder.Get()); + outputEncodingContext.Encoder->OnStreamEnd(); guard->Release(); // NB(eshcherbin): Offload inner representation to binary/text format encoding (including compression). - auto encodeFuture = BIND([buffer, encoder = std::move(encoder)] { + auto encodeFuture = BIND([buffer = outputEncodingContext.EncoderBuffer, encoder = std::move(outputEncodingContext.Encoder)] { encoder->Close(); }) .AsyncVia(EncodingOffloadThreadPool_->GetInvoker()) @@ -854,7 +815,7 @@ void TSolomonExporter::DoHandleShard( rsp->SetStatus(EStatusCode::OK); - auto replyBlob = TSharedRef::FromString(buffer->Str()); + auto replyBlob = TSharedRef::FromString(outputEncodingContext.EncoderBuffer->Str()); responsePromise.Set(replyBlob); WaitFor(rsp->WriteBody(replyBlob)) diff --git a/yt/yt/library/profiling/solomon/helpers.cpp b/yt/yt/library/profiling/solomon/helpers.cpp new file mode 100644 index 0000000000..4fe877e672 --- /dev/null +++ b/yt/yt/library/profiling/solomon/helpers.cpp @@ -0,0 +1,78 @@ +#include "helpers.h" +#include "private.h" + +#include <yt/yt/core/http/http.h> + +#include <library/cpp/monlib/encode/json/json.h> +#include <library/cpp/monlib/encode/spack/spack_v1.h> +#include <library/cpp/monlib/encode/prometheus/prometheus.h> + +namespace NYT::NProfiling { + +using namespace NHttp; + +//////////////////////////////////////////////////////////////////////////////// + +void FillResponseHeaders(const TOutputEncodingContext& outputEncodingContext, const THeadersPtr& headers) +{ + headers->Set("Content-Type", TString{::NMonitoring::ContentTypeByFormat(outputEncodingContext.Format)}); + headers->Set("Content-Encoding", TString{::NMonitoring::ContentEncodingByCompression(outputEncodingContext.Compression)}); +} + +TOutputEncodingContext CreateOutputEncodingContextFromHeaders(const THeadersPtr& headers) +{ + TOutputEncodingContext context; + + if (auto accept = headers->Find("Accept")) { + context.Format = ::NMonitoring::FormatFromAcceptHeader(*accept); + } + + if (auto acceptEncoding = headers->Find("Accept-Encoding")) { + context.Compression = ::NMonitoring::CompressionFromAcceptEncodingHeader(*acceptEncoding); + if (context.Compression == ::NMonitoring::ECompression::UNKNOWN) { + // Fallback to identity if we cannot recognize the requested encoding. + context.Compression = ::NMonitoring::ECompression::IDENTITY; + } + } + + if (auto isSolomonPull = headers->Find(IsSolomonPullHeaderName)) { + if (!TryFromString<bool>(*isSolomonPull, context.IsSolomonPull)) { + THROW_ERROR_EXCEPTION("Invalid value of %Qv header", IsSolomonPullHeaderName) + << TErrorAttribute("value", *isSolomonPull); + } + } else { + context.IsSolomonPull = context.Format == ::NMonitoring::EFormat::JSON || context.Format == ::NMonitoring::EFormat::SPACK; + } + + context.EncoderBuffer = std::make_shared<TStringStream>(); + + switch (context.Format) { + case ::NMonitoring::EFormat::UNKNOWN: + case ::NMonitoring::EFormat::JSON: + context.Encoder = ::NMonitoring::BufferedEncoderJson(context.EncoderBuffer.get()); + context.Format = ::NMonitoring::EFormat::JSON; + context.Compression = ::NMonitoring::ECompression::IDENTITY; + break; + + case ::NMonitoring::EFormat::SPACK: + context.Encoder = ::NMonitoring::EncoderSpackV1( + context.EncoderBuffer.get(), + ::NMonitoring::ETimePrecision::SECONDS, + context.Compression); + break; + + case ::NMonitoring::EFormat::PROMETHEUS: + context.Encoder = ::NMonitoring::EncoderPrometheus(context.EncoderBuffer.get()); + context.Compression = ::NMonitoring::ECompression::IDENTITY; + break; + + default: + THROW_ERROR_EXCEPTION("Unsupported format %Qv", ::NMonitoring::ContentTypeByFormat(context.Format)); + } + + return context; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/solomon/helpers.h b/yt/yt/library/profiling/solomon/helpers.h new file mode 100644 index 0000000000..4713fd20b7 --- /dev/null +++ b/yt/yt/library/profiling/solomon/helpers.h @@ -0,0 +1,32 @@ +#pragma once + +#include <yt/yt/core/http/public.h> + +#include <library/cpp/monlib/encode/format.h> +#include <library/cpp/monlib/encode/encoder.h> + +namespace NYT::NProfiling { + +//////////////////////////////////////////////////////////////////////////////// + +struct TOutputEncodingContext +{ + ::NMonitoring::EFormat Format = ::NMonitoring::EFormat::JSON; + //! JSON and SPACK are considered solomon formats, unless this value is explicitly overriden in headers. + bool IsSolomonPull = true; + ::NMonitoring::ECompression Compression = ::NMonitoring::ECompression::IDENTITY; + + //! Buffer to be filled by encoder below. + std::shared_ptr<TStringStream> EncoderBuffer; + ::NMonitoring::IMetricEncoderPtr Encoder; +}; + +//! Fills content type related headers according to format and compression. +void FillResponseHeaders(const TOutputEncodingContext& outputEncodingContext, const NHttp::THeadersPtr& headers); + +//! Creates output encoder according to request headers. +TOutputEncodingContext CreateOutputEncodingContextFromHeaders(const NHttp::THeadersPtr& headers); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/solomon/private.h b/yt/yt/library/profiling/solomon/private.h index c0e53ab680..334287f8d5 100644 --- a/yt/yt/library/profiling/solomon/private.h +++ b/yt/yt/library/profiling/solomon/private.h @@ -10,6 +10,8 @@ YT_DEFINE_GLOBAL(const NLogging::TLogger, SolomonLogger, "Solomon"); inline const int DefaultProducerCollectionBatchSize = 100; +inline static const TString IsSolomonPullHeaderName = "X-YT-IsSolomonPull"; + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/solomon/proxy.cpp b/yt/yt/library/profiling/solomon/proxy.cpp new file mode 100644 index 0000000000..b2e8256501 --- /dev/null +++ b/yt/yt/library/profiling/solomon/proxy.cpp @@ -0,0 +1,428 @@ +#include "proxy.h" +#include "private.h" + +#include <yt/yt/core/http/http.h> +#include <yt/yt/core/http/server.h> +#include <yt/yt/core/http/client.h> +#include <yt/yt/core/http/helpers.h> + +#include <yt/yt/core/logging/log.h> + +#include <yt/yt/core/ytree/fluent.h> + +#include <library/cpp/monlib/encode/json/json.h> +#include <library/cpp/monlib/encode/spack/spack_v1.h> +#include <library/cpp/monlib/encode/prometheus/prometheus.h> + +namespace NYT::NProfiling { + +using namespace NConcurrency; +using namespace NHttp; +using namespace NYTree; +using namespace NLogging; +using namespace NTracing; + +//////////////////////////////////////////////////////////////////////////////// + +const static auto& Logger = SolomonLogger; + +//////////////////////////////////////////////////////////////////////////////// + +static const TString ShardIndexParameterName("shard_index"); +static const TString ShardCountParameterName("shard_count"); +static const TString ComponentParameterName("component"); +static const TString InstanceParameterName("instance"); + +static const TString InstanceLabelParameterNamePrefix("instance_"); + +static const std::vector<TString> ForwardParameterWhitelist = { + "period", + "now", +}; + +static const std::vector<TString> ForwardHeaderWhitelist = { + "X-Solomon-GridSec", + "X-Solomon-ClusterId", +}; + +static constexpr int PullErrorSampleSize = 20; + +//////////////////////////////////////////////////////////////////////////////// + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +int ParseIntegerParameter(const TCgiParameters& parameters, const TString& parameterName, int defaultValue) +{ + int value = defaultValue; + + if (auto it = parameters.Find(parameterName); it != parameters.end()) { + if (!TryFromString<int>(it->second, value)) { + THROW_ERROR_EXCEPTION("Invalid value of %Qv parameter", parameterName) + << TErrorAttribute("value", it->second); + } + } + + return value; +} + +std::optional<TString> TrimPrefix(const TString& name, const TString& prefix) +{ + if (name.StartsWith(prefix)) { + return name.substr(prefix.size()); + } + + return {}; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +void TSolomonProxyConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("public_component_names", &TThis::PublicComponentNames) + .Default(); + registrar.Parameter("max_endpoints_per_request", &TThis::MaxEndpointsPerRequest) + .Default(30); +} + +//////////////////////////////////////////////////////////////////////////////// + +TSolomonProxy::TSolomonProxy(TSolomonProxyConfigPtr config, IPollerPtr poller) + : Config_(std::move(config)) + , HttpClient_(CreateClient(New<TClientConfig>(), std::move(poller))) +{ } + +void TSolomonProxy::RegisterEndpointProvider(const IEndpointProviderPtr& endpointProvider) +{ + EmplaceOrCrash(ComponentNameToEndpointProvider_, endpointProvider->GetComponentName(), endpointProvider); +} + +void TSolomonProxy::UnregisterEndpointProvider(const IEndpointProviderPtr& endpointProvider) +{ + EraseOrCrash(ComponentNameToEndpointProvider_, endpointProvider->GetComponentName()); +} + +void TSolomonProxy::Register(const TString& prefix, const IServerPtr& server) +{ + server->AddHandler(prefix + "/sensors", BIND(&TSolomonProxy::HandleSensors, MakeStrong(this))); + // TODO(achulkov2): Expose available tags/targets? +} + +void TSolomonProxy::ValidateShardingParameters(int shardIndex, int shardCount) +{ + if (!(0 <= shardIndex && shardIndex < shardCount)) { + THROW_ERROR_EXCEPTION( + "Invalid sharding configuration, %v=%v must be in range [0, %v=%v)", + ShardIndexParameterName, + shardIndex, + ShardCountParameterName, + shardCount); + } +} + +THeadersPtr TSolomonProxy::PreparePullHeaders(const THeadersPtr& reqHeaders, const TOutputEncodingContext& outputContext) +{ + auto pullHeaders = New<THeaders>(); + // We use this format since it is the only one that supports compression. + pullHeaders->Set("Accept", TString(::NMonitoring::ContentTypeByFormat(::NMonitoring::EFormat::SPACK))); + pullHeaders->Set("Accept-Encoding", TString(::NMonitoring::ContentEncodingByCompression(::NMonitoring::ECompression::ZSTD))); + // We use a separate header to propagate whether or not this was a "solomon" pull. It affects counter-to-rate conversion. + pullHeaders->Set(IsSolomonPullHeaderName, outputContext.IsSolomonPull ? "1" : "0"); + + for (const auto& forwardHeader : ForwardHeaderWhitelist) { + if (auto value = reqHeaders->Find(forwardHeader)) { + pullHeaders->Set(forwardHeader, *value); + } + } + + return pullHeaders; +} + +TCgiParameters TSolomonProxy::PreparePullParameters(const TCgiParameters& parameters) +{ + TCgiParameters pullParameters; + for (const auto& forwardParamater : ForwardParameterWhitelist) { + if (auto valueIt = parameters.find(forwardParamater); valueIt != parameters.end()) { + pullParameters.insert(*valueIt); + } + } + + return pullParameters; +} + +TSolomonProxy::TComponentMatcher TSolomonProxy::GetComponentMatcher(const TCgiParameters& parameters) +{ + TComponentMatcher componentMatcher; + + if (parameters.Has(ComponentParameterName)) { + componentMatcher.Components.emplace(); + } + + for (const auto& componentName : parameters.Range(ComponentParameterName)) { + componentMatcher.Components->insert(componentName); + } + + return componentMatcher; +} + +bool TSolomonProxy::MatchComponent(const TComponentMatcher& componentMatcher, const IEndpointProviderPtr& endpointProvider) +{ + return !componentMatcher.Components || componentMatcher.Components->contains(endpointProvider->GetComponentName()); +} + +TSolomonProxy::TInstanceFilter TSolomonProxy::GetInstanceFilter(const TCgiParameters& parameters) +{ + TInstanceFilter instanceFilter; + + for (const auto& [name, value] : parameters) { + if (name == InstanceParameterName) { + if (!instanceFilter.Instances) { + instanceFilter.Instances.emplace(); + } + + instanceFilter.Instances->insert(value); + } else if (auto instanceParameterName = TrimPrefix(name, InstanceLabelParameterNamePrefix)) { + instanceFilter.LabelFilter[*instanceParameterName].insert(value); + } + } + + return instanceFilter; +} + +std::vector<IEndpointProvider::TEndpoint> TSolomonProxy::FilterInstances( + const TInstanceFilter& instanceFilter, + const std::vector<IEndpointProvider::TEndpoint>& endpoints) +{ + std::vector<IEndpointProvider::TEndpoint> filteredEndpoints; + filteredEndpoints.reserve(endpoints.size()); + + for (const auto& endpoint : endpoints) { + // Skip non-selected instances if instance names are specified. + if (instanceFilter.Instances && !instanceFilter.Instances->contains(endpoint.Name)) { + continue; + } + + // For all specified labels, skip instances that do not have the selected values of these labels (or any value). + bool matchesLabelFilter = true; + for (const auto& [label, acceptableValues] : instanceFilter.LabelFilter) { + auto instanceLabelValueIt = endpoint.Labels.find(label); + if (instanceLabelValueIt == endpoint.Labels.end() || !acceptableValues.contains(instanceLabelValueIt->second)) { + matchesLabelFilter = false; + break; + } + } + if (!matchesLabelFilter) { + continue; + } + + filteredEndpoints.push_back(endpoint); + } + + return filteredEndpoints; +} + +std::vector<TString> TSolomonProxy::CollectEndpoints(const TCgiParameters& parameters, int shardIndex, int shardCount) const +{ + auto componentMatcher = GetComponentMatcher(parameters); + auto instanceFilter = GetInstanceFilter(parameters); + + YT_LOG_DEBUG( + "Selecting endpoints (PublicComponentNames: %v, RequestedComponents: %v, RequestedInstances: %v, InstanceLabelFilter: %v)", + Config_->PublicComponentNames, + componentMatcher.Components, + instanceFilter.Instances, + instanceFilter.LabelFilter); + + int selectedComponents = 0; + int skippedByInternalFilter = 0; + int skippedByUserFilter = 0; + + std::vector<TString> allEndpoints; + + for (const auto& [componentName, endpointProvider] : ComponentNameToEndpointProvider_) { + YT_VERIFY(componentName == endpointProvider->GetComponentName()); + + if (Config_->PublicComponentNames && !Config_->PublicComponentNames->contains(componentName)) { + ++skippedByInternalFilter; + YT_LOG_DEBUG("Skipping non-public component (ComponentName: %v)", componentName); + continue; + } + + if (!MatchComponent(componentMatcher, endpointProvider)) { + ++skippedByUserFilter; + YT_LOG_DEBUG("Skipping non-requested component (ComponentName: %v)", componentName); + continue; + } + + ++selectedComponents; + + auto componentEndpoints = endpointProvider->GetEndpoints(); + auto selectedEndpointCountBefore = allEndpoints.size(); + auto filteredInstances = FilterInstances(instanceFilter, componentEndpoints); + for (const auto& endpoint : filteredInstances) { + if (FarmFingerprint(endpoint.Address) % shardCount == shardIndex) { + allEndpoints.push_back(endpoint.Address); + } + } + + // For debug purposes. + int shardEndpointCount = 0; + for (const auto& endpoint : componentEndpoints) { + if (FarmFingerprint(endpoint.Address) % shardCount == shardIndex) { + ++shardEndpointCount; + } + } + + YT_LOG_DEBUG( + "Selected component endpoints for pull (ComponentName: %v, SelectedEndpointCount: %v, TotalEndpointCount: %v, InstanceFilterMatchCount: %v, ShardEndpointCount: %v)", + componentName, + allEndpoints.size() - selectedEndpointCountBefore, + componentEndpoints.size(), + filteredInstances.size(), + shardEndpointCount); + } + + YT_LOG_DEBUG( + "Collected endpoints from components (EndpointCount: %v, ComponentCount: %v, ComponentSkippedByInternalFilterCount: %v, ComponentSkippedByUserFilterCount: %v)", + allEndpoints.size(), + selectedComponents, + skippedByInternalFilter, + skippedByUserFilter); + + return allEndpoints; +} + +void TSolomonProxy::HandleSensors(const IRequestPtr& req, const IResponseWriterPtr& rsp) +{ + try { + GuardedHandleSensors(req, rsp); + } catch(const std::exception& ex) { + YT_LOG_DEBUG(ex, "Failed to pull sensors from endpoints"); + + if (!rsp->AreHeadersFlushed()) { + try { + rsp->SetStatus(EStatusCode::InternalServerError); + rsp->GetHeaders()->Remove("Content-Type"); + rsp->GetHeaders()->Remove("Content-Encoding"); + + ReplyError(rsp, ex); + } catch (const std::exception& ex) { + YT_LOG_DEBUG(ex, "Failed to send sensor pull error"); + } + } + } +} + +void TSolomonProxy::GuardedHandleSensors(const IRequestPtr& req, const IResponseWriterPtr& rsp) +{ + auto reqUrlRef = req->GetUrl(); + TCgiParameters parameters(reqUrlRef.RawQuery); + + auto shardIndex = ParseIntegerParameter(parameters, ShardIndexParameterName, /*defaultValue*/ 0); + auto shardCount = ParseIntegerParameter(parameters, ShardCountParameterName, /*defaultValue*/ 1); + + YT_LOG_DEBUG( + "Performing solomon proxy fetch (ShardIndex: %v, ShardCount: %v, Query: %v)", + shardIndex, + shardCount, + reqUrlRef.RawQuery); + + ValidateShardingParameters(shardIndex, shardCount); + + auto outputEncodingContext = CreateOutputEncodingContextFromHeaders(req->GetHeaders()); + + auto filteredEndpoints = CollectEndpoints(parameters, shardIndex, shardCount); + + // TODO(achulkov2): Ideally, we would want some sort of memory accounting here. + if (std::ssize(filteredEndpoints) > Config_->MaxEndpointsPerRequest) { + THROW_ERROR_EXCEPTION("Cannot pull sensors from %v endpoints at once, please retry the request with a larger shard count", filteredEndpoints.size()) + << TErrorAttribute("max_endpoints_per_request", Config_->MaxEndpointsPerRequest); + } + + std::vector<TFuture<IResponsePtr>> asyncPullResponses; + asyncPullResponses.reserve(filteredEndpoints.size()); + + auto pullHeaders = PreparePullHeaders(req->GetHeaders(), outputEncodingContext); + auto pullParameters = PreparePullParameters(parameters); + + YT_LOG_DEBUG( + "Proxying pull requests to hosts (EndpointCount: %v, Parameters: %v, Headers: %v)", + filteredEndpoints.size(), + // This is easier to read than the escaped string. + std::vector<std::pair<TString, TString>>(pullParameters.begin(), pullParameters.end()), + pullHeaders->Dump()); + + for (auto pullUrl : filteredEndpoints) { + if (!pullParameters.empty()) { + pullUrl += Format("?%v", pullParameters.Print()); + } + + asyncPullResponses.push_back(HttpClient_->Get(pullUrl, pullHeaders)); + } + + auto pullResponseOrErrors = WaitFor(AllSet(asyncPullResponses)) + .ValueOrThrow(); + + std::vector<TError> pullErrors; + + for (const auto& pullResponseOrError : pullResponseOrErrors) { + if (!pullResponseOrError.IsOK()) { + YT_LOG_DEBUG(pullResponseOrError, "Error while pulling sensors from endpoint"); + pullErrors.push_back(pullResponseOrError); + continue; + } + + const auto& pullResponse = pullResponseOrError.Value(); + auto body = pullResponse->ReadAll(); + + if (pullResponse->GetStatusCode() != EStatusCode::OK) { + YT_LOG_DEBUG("Sensor pull failed (StatusCode: %v, Response: %v)", pullResponse->GetStatusCode(), body); + pullErrors.push_back( + TError("Sensor pull failed with status code %v", pullResponse->GetStatusCode()) + << TErrorAttribute("response_body", ToString(body)) + << TErrorAttribute("status_code", pullResponse->GetStatusCode())); + continue; + } + + TMemoryInput stream(body.Begin(), body.Size()); + ::NMonitoring::DecodeSpackV1(&stream, outputEncodingContext.Encoder.Get()); + } + + YT_LOG_DEBUG( + "Pulled sensors from endpoints (RequestCount: %v, SuccessCount: %v, FailureCount: %v)", + asyncPullResponses.size(), + asyncPullResponses.size() - pullErrors.size(), + pullErrors.size()); + + // TODO(achulkov2): Think about more introspection of pull failures for other cases. + if (!pullErrors.empty() && pullErrors.size() == asyncPullResponses.size()) { + auto totalErrorCount = pullErrors.size(); + if (std::ssize(pullErrors) > PullErrorSampleSize) { + pullErrors.resize(PullErrorSampleSize); + } + THROW_ERROR_EXCEPTION("Could not pull sensors from any endpoint") + << TErrorAttribute("sampled_error_count", pullErrors.size()) + << TErrorAttribute("total_error_count", totalErrorCount) + << pullErrors; + } + + // TODO(achulkov2): Maybe offload this to a separate thread pool like it is done in the exporter. + outputEncodingContext.Encoder->Close(); + + rsp->SetStatus(EStatusCode::OK); + FillResponseHeaders(outputEncodingContext, rsp->GetHeaders()); + + auto replyBlob = TSharedRef::FromString(outputEncodingContext.EncoderBuffer->Str()); + + WaitFor(rsp->WriteBody(replyBlob)) + .ThrowOnError(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/solomon/proxy.h b/yt/yt/library/profiling/solomon/proxy.h new file mode 100644 index 0000000000..937b098461 --- /dev/null +++ b/yt/yt/library/profiling/solomon/proxy.h @@ -0,0 +1,118 @@ +#pragma once + +#include "public.h" +#include "helpers.h" + +#include <yt/yt/core/http/public.h> + +#include <yt/yt/core/concurrency/public.h> + +#include <yt/yt/core/ytree/yson_struct.h> + +#include <library/cpp/yt/farmhash/farm_hash.h> + +#include <library/cpp/cgiparam/cgiparam.h> + +namespace NYT::NProfiling { + +//////////////////////////////////////////////////////////////////////////////// + +struct IEndpointProvider + : public TRefCounted +{ +public: + struct TEndpoint + { + //! Logical name of the endpoint. + TString Name; + //! Actual address to pull sensors from. + TString Address; + //! A set of labels which can be filtered upon. + THashMap<TString, TString> Labels; + }; + + //! Human-readable component name. + virtual TString GetComponentName() const = 0; + //! This method is allowed to return a cached response. + virtual std::vector<TEndpoint> GetEndpoints() const = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IEndpointProvider) + +//////////////////////////////////////////////////////////////////////////////// + +struct TSolomonProxyConfig + : public NYTree::TYsonStruct +{ + //! A list of publicly available components. + //! Endpoints from other components are not pulled from. + //! Currently, this filter is applied for all requests. + //! TODO(achulkov2): Add some form of access control to allow bypassing this check. + std::optional<THashSet<TString>> PublicComponentNames; + + //! Forbids pulling sensors from too many endpoints within a single request. + //! Instead of increasing this limit, one should make requests with a larger shard count. + int MaxEndpointsPerRequest; + + REGISTER_YSON_STRUCT(TSolomonProxyConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TSolomonProxyConfig) + +//////////////////////////////////////////////////////////////////////////////// + +class TSolomonProxy + : public TRefCounted +{ +public: + TSolomonProxy(TSolomonProxyConfigPtr config, NConcurrency::IPollerPtr poller); + + void RegisterEndpointProvider(const IEndpointProviderPtr& endpointProvider); + void UnregisterEndpointProvider(const IEndpointProviderPtr& endpointProvider); + + void Register(const TString& prefix, const NHttp::IServerPtr& server); + +private: + const TSolomonProxyConfigPtr Config_; + const NHttp::IClientPtr HttpClient_; + THashMap<TString, IEndpointProviderPtr> ComponentNameToEndpointProvider_; + + void HandleSensors(const NHttp::IRequestPtr& req, const NHttp::IResponseWriterPtr& rsp); + void GuardedHandleSensors(const NHttp::IRequestPtr& req, const NHttp::IResponseWriterPtr& rsp); + + static void ValidateShardingParameters(int shardIndex, int shardCount); + + struct TComponentMatcher + { + std::optional<THashSet<TString>> Components; + }; + + static TComponentMatcher GetComponentMatcher(const TCgiParameters& parameters); + static bool MatchComponent(const TComponentMatcher& componentMatcher, const IEndpointProviderPtr& endpointProvider); + + struct TInstanceFilter + { + std::optional<THashSet<TString>> Instances; + THashMap<TString, THashSet<TString>> LabelFilter; + }; + + static TInstanceFilter GetInstanceFilter(const TCgiParameters& parameters); + static std::vector<IEndpointProvider::TEndpoint> FilterInstances( + const TInstanceFilter& instanceFilter, + const std::vector<IEndpointProvider::TEndpoint>& endpoints); + + std::vector<TString> CollectEndpoints(const TCgiParameters& parameters, int shardIndex, int shardCount) const; + + //! For safety we only proxy whitelisted headers. + static NHttp::THeadersPtr PreparePullHeaders(const NHttp::THeadersPtr& reqHeaders, const TOutputEncodingContext& outputContext); + //! For safety we only proxy whitelisted parameters. + static TCgiParameters PreparePullParameters(const TCgiParameters& parameters); +}; + +DEFINE_REFCOUNTED_TYPE(TSolomonProxy) + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/solomon/proxy_example/main.cpp b/yt/yt/library/profiling/solomon/proxy_example/main.cpp new file mode 100644 index 0000000000..5f01397780 --- /dev/null +++ b/yt/yt/library/profiling/solomon/proxy_example/main.cpp @@ -0,0 +1,80 @@ +#include <yt/yt/library/profiling/solomon/proxy.h> + +#include <yt/yt/core/concurrency/poller.h> +#include <yt/yt/core/concurrency/thread_pool.h> +#include <yt/yt/core/concurrency/thread_pool_poller.h> + +#include <yt/yt/core/http/server.h> + +#include <util/generic/yexception.h> + +using namespace NYT; +using namespace NYT::NHttp; +using namespace NYT::NConcurrency; +using namespace NYT::NProfiling; + +//////////////////////////////////////////////////////////////////////////////// + +class TStaticEndpointProvider + : public IEndpointProvider +{ +public: + TStaticEndpointProvider(std::vector<TString> addresses) + : Addresses_(std::move(addresses)) + { } + + TString GetComponentName() const override + { + return "static"; + } + + std::vector<TEndpoint> GetEndpoints() const override + { + std::vector<TEndpoint> endpoints; + for (const auto& address : Addresses_) { + endpoints.push_back({.Address = address}); + } + + return endpoints; + } + +private: + std::vector<TString> Addresses_; +}; + +int main(int argc, char* argv[]) +{ + try { + if (argc != 2) { + throw yexception() << "usage: " << argv[0] << " PORT"; + } + + auto port = FromString<int>(argv[1]); + auto poller = CreateThreadPoolPoller(1, "Example"); + auto server = CreateServer(port, poller); + + auto proxy = New<TSolomonProxy>(New<TSolomonProxyConfig>(), poller); + + std::vector<TString> addresses = { + "list", + "your", + "hosts", + "here" + }; + auto staticEndpointProvider = New<TStaticEndpointProvider>(addresses); + proxy->RegisterEndpointProvider(staticEndpointProvider); + + proxy->Register("/solomon", server); + + server->Start(); + + TDelayedExecutor::WaitForDuration(TDuration::Max()); + } catch (const std::exception& ex) { + Cerr << ex.what() << Endl; + _exit(1); + } + + return 0; +} + +//////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/library/profiling/solomon/proxy_example/ya.make b/yt/yt/library/profiling/solomon/proxy_example/ya.make new file mode 100644 index 0000000000..ec8e898f5d --- /dev/null +++ b/yt/yt/library/profiling/solomon/proxy_example/ya.make @@ -0,0 +1,11 @@ +PROGRAM(proxy-example) + +INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) + +SRCS(main.cpp) + +PEERDIR( + yt/yt/library/profiling/solomon +) + +END() diff --git a/yt/yt/library/profiling/solomon/public.h b/yt/yt/library/profiling/solomon/public.h index 02bf5ad2c9..d497d9f9b9 100644 --- a/yt/yt/library/profiling/solomon/public.h +++ b/yt/yt/library/profiling/solomon/public.h @@ -8,8 +8,13 @@ namespace NYT::NProfiling { DECLARE_REFCOUNTED_STRUCT(TShardConfig) DECLARE_REFCOUNTED_STRUCT(TSolomonExporterConfig) +DECLARE_REFCOUNTED_STRUCT(TSolomonProxyConfig) + DECLARE_REFCOUNTED_CLASS(TSolomonExporter) DECLARE_REFCOUNTED_CLASS(TSolomonRegistry) +DECLARE_REFCOUNTED_CLASS(TSolomonProxy) + +DECLARE_REFCOUNTED_STRUCT(IEndpointProvider) //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/library/profiling/solomon/ya.make b/yt/yt/library/profiling/solomon/ya.make index 4b20516ed2..d578cc3e76 100644 --- a/yt/yt/library/profiling/solomon/ya.make +++ b/yt/yt/library/profiling/solomon/ya.make @@ -5,8 +5,10 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) SRCS( cube.cpp exporter.cpp + helpers.cpp percpu.cpp producer.cpp + proxy.cpp registry.cpp remote.cpp sensor.cpp @@ -31,3 +33,7 @@ PEERDIR( ) END() + +RECURSE( + proxy_example +) |