diff options
author | Ivan Sukhov <evanevannnn@ydb.tech> | 2025-03-25 18:48:02 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-25 18:48:02 +0300 |
commit | e88b590555a32f7c43cc0a78612059b769fca2af (patch) | |
tree | 1a0648781afacd0342b0d657b8a3a71bcf413764 | |
parent | 320040026605888ad720d8065822a442a27d221f (diff) | |
download | ydb-e88b590555a32f7c43cc0a78612059b769fca2af.tar.gz |
Solomon read actor refactoring (#15662)
28 files changed, 1180 insertions, 442 deletions
diff --git a/ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp b/ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp index 75c7ccb1671..b3570cf19d4 100644 --- a/ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp +++ b/ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp @@ -47,9 +47,9 @@ public: "expected EvEnd <= EventSpaceEnd(TEvents::ES_PRIVATE)"); struct TEvNextListingChunkReceived : public NActors::TEventLocal<TEvNextListingChunkReceived, EvNextListingChunkReceived> { - NSo::ISolomonAccessorClient::TListMetricsResult ListingResult; - TEvNextListingChunkReceived(NSo::ISolomonAccessorClient::TListMetricsResult listingResult) - : ListingResult(std::move(listingResult)) {}; + NSo::TListMetricsResponse Response; + explicit TEvNextListingChunkReceived(NSo::TListMetricsResponse&& response) + : Response(std::move(response)) {} }; struct TEvRoundRobinStageTimeout : public NActors::TEventLocal<TEvRoundRobinStageTimeout, EvRoundRobinStageTimeout> { @@ -90,6 +90,7 @@ public: STATEFN(ThereAreMetricsToListState) { try { switch (const auto etype = ev->GetTypeRewrite()) { + hFunc(TEvSolomonProvider::TEvUpdateConsumersCount, HandleUpdateConsumersCount); hFunc(TEvSolomonProvider::TEvGetNextBatch, HandleGetNextBatch); hFunc(TEvPrivatePrivate::TEvNextListingChunkReceived, HandleNextListingChunkReceived); cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout); @@ -109,6 +110,7 @@ public: STATEFN(NoMoreMetricsState) { try { switch (const auto etype = ev->GetTypeRewrite()) { + hFunc(TEvSolomonProvider::TEvUpdateConsumersCount, HandleUpdateConsumersCount); hFunc(TEvSolomonProvider::TEvGetNextBatch, HandleGetNextBatchForEmptyState); cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout); cFunc(NActors::TEvents::TSystem::Poison, HandlePoison); @@ -126,6 +128,7 @@ public: STATEFN(AnErrorOccurredState) { try { switch (const auto etype = ev->GetTypeRewrite()) { + hFunc(TEvSolomonProvider::TEvUpdateConsumersCount, HandleUpdateConsumersCount); hFunc(TEvSolomonProvider::TEvGetNextBatch, HandleGetNextBatchForErrorState); cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout); cFunc(NActors::TEvents::TSystem::Poison, HandlePoison); @@ -141,6 +144,15 @@ public: } private: + void HandleUpdateConsumersCount(TEvSolomonProvider::TEvUpdateConsumersCount::TPtr& ev) { + if (const auto [it, inserted] = UpdatedConsumers.emplace(ev->Sender); inserted) { + LOG_D("TDqSolomonMetricsQueueActor", + "HandleUpdateConsumersCount Reducing ConsumersCount by " << ev->Get()->Record.GetConsumersCountDelta() << ", received from " << ev->Sender); + ConsumersCount -= ev->Get()->Record.GetConsumersCountDelta(); + } + Send(ev->Sender, new TEvSolomonProvider::TEvAck(ev->Get()->Record.GetTransportMeta())); + } + void HandleGetNextBatch(TEvSolomonProvider::TEvGetNextBatch::TPtr& ev) { if (HasEnoughToSend()) { LOG_I("TDqSolomonMetricsQueueActor", "HandleGetNextBatch has enough metrics to send, trying to send them"); @@ -157,7 +169,7 @@ private: YQL_ENSURE(ListingFuture.Defined()); ListingFuture = Nothing(); LOG_D("TDqSolomonMetricsQueueActor", "HandleNextListingChunkReceived"); - if (SaveRetrievedResults(ev->Get()->ListingResult)) { + if (SaveRetrievedResults(ev->Get()->Response)) { AnswerPendingRequests(true); if (!HasPendingRequests) { LOG_D("TDqSolomonMetricsQueueActor", "HandleNextListingChunkReceived no pending requests. Trying to prefetch"); @@ -213,22 +225,22 @@ private: Become(&TDqSolomonMetricsQueueActor::AnErrorOccurredState); } - bool SaveRetrievedResults(const NSo::ISolomonAccessorClient::TListMetricsResult& listingResult) { + bool SaveRetrievedResults(const NSo::TListMetricsResponse& response) { LOG_T("TDqSolomonMetricsQueueActor", "SaveRetrievedResults"); - if (!listingResult.Success) { - MaybeIssues = listingResult.ErrorMsg; + if (response.Status != NSo::EStatus::STATUS_OK) { + MaybeIssues = response.Error; return false; } - if (CurrentPage >= listingResult.PagesCount) { + if (CurrentPage >= response.Result.PagesCount) { LOG_I("TDqSolomonMetricsQueueActor", "SaveRetrievedResults no more metrics to list"); HasMoreMetrics = false; Become(&TDqSolomonMetricsQueueActor::NoMoreMetricsState); } - LOG_D("TDqSolomonMetricsQueueActor", "SaveRetrievedResults saving: " << listingResult.Result.size() << " metrics"); - for (const auto& metric : listingResult.Result) { - NSo::MetricQueue::TMetricLabels protoMetric; + LOG_D("TDqSolomonMetricsQueueActor", "SaveRetrievedResults saving: " << response.Result.Metrics.size() << " metrics"); + for (const auto& metric : response.Result.Metrics) { + NSo::MetricQueue::TMetric protoMetric; protoMetric.SetType(metric.Type); protoMetric.MutableLabels()->insert(metric.Labels.begin(), metric.Labels.end()); Metrics.emplace_back(std::move(protoMetric)); @@ -266,12 +278,11 @@ private: SolomonClient ->ListMetrics(ReadParams.Source.GetSelectors(), PageSize, CurrentPage++) .Subscribe([actorSystem, selfId = SelfId()]( - const NThreading::TFuture<NSo::ISolomonAccessorClient::TListMetricsResult>& future) -> void { + NThreading::TFuture<NSo::TListMetricsResponse> future) -> void { try { actorSystem->Send( selfId, - new TEvPrivatePrivate::TEvNextListingChunkReceived( - std::move(future.GetValue()))); + new TEvPrivatePrivate::TEvNextListingChunkReceived(future.ExtractValue())); } catch (const std::exception& e) { actorSystem->Send( selfId, @@ -347,7 +358,8 @@ private: void SendMetrics(const NActors::TActorId& consumer, const NDqProto::TMessageTransportMeta& transportMeta) { YQL_ENSURE(!MaybeIssues.Defined()); - std::vector<NSo::MetricQueue::TMetricLabels> result; + std::vector<NSo::MetricQueue::TMetric> result; + result.reserve(std::min<ui64>(BatchCountLimit, Metrics.size())); while (!Metrics.empty() && result.size() < BatchCountLimit) { result.push_back(Metrics.back()); Metrics.pop_back(); @@ -396,13 +408,14 @@ private: bool IsRoundRobinFinishScheduled = false; bool RoundRobinStageFinished = false; THashSet<NActors::TActorId> StartedConsumers; + THashSet<NActors::TActorId> UpdatedConsumers; THashSet<NActors::TActorId> FinishedConsumers; THashMap<NActors::TActorId, ui64> FinishingConsumerToLastSeqNo; - TMaybe<NThreading::TFuture<NSo::ISolomonAccessorClient::TListMetricsResult>> ListingFuture; + TMaybe<NThreading::TFuture<NSo::TListMetricsResponse>> ListingFuture; bool HasPendingRequests; THashMap<NActors::TActorId, TDeque<NDqProto::TMessageTransportMeta>> PendingRequests; - std::vector<NSo::MetricQueue::TMetricLabels> Metrics; + std::vector<NSo::MetricQueue::TMetric> Metrics; TMaybe<TString> MaybeIssues; ui64 PageSize; diff --git a/ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.cpp b/ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.cpp index a607eb88e17..953d8bdeb22 100644 --- a/ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.cpp +++ b/ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.cpp @@ -3,6 +3,7 @@ #include <library/cpp/protobuf/util/pb_io.h> +#include <util/string/join.h> #include <ydb/library/yql/dq/actors/common/retry_queue.h> #include <ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.h> #include <ydb/library/yql/providers/solomon/events/events.h> @@ -65,29 +66,14 @@ using namespace NKikimr::NMiniKQL; namespace { -enum ESystemColumn{ - SC_KIND = 0, - SC_LABELS, - SC_VALUE, - SC_TYPE, - SC_TS -}; - -auto RetryPolicy = NYql::NDq::THttpSenderRetryPolicy::GetExponentialBackoffPolicy( - [](const NHttp::TEvHttpProxy::TEvHttpIncomingResponse* resp){ - if (!resp || !resp->Response) { - // Connection wasn't established. Should retry. - return ERetryErrorClass::ShortRetry; - } - - if (resp->Response->Status == "401") { - return ERetryErrorClass::NoRetry; - } - - return ERetryErrorClass::ShortRetry; - }); - class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadActor>, public IDqComputeActorAsyncInput { +private: + struct TMetricTimeRange { + NSo::TMetric Metric; + TInstant From; + TInstant To; + }; + public: static constexpr char ActorName[] = "DQ_SOLOMON_READ_ACTOR"; @@ -99,8 +85,8 @@ public: const THolderFactory& holderFactory, NKikimr::NMiniKQL::TProgramBuilder& programBuilder, TDqSolomonReadParams&& readParams, - ui64 maxInflightDataRequests, ui64 computeActorBatchSize, + ui64 metricsQueueConsumersCountDelta, NActors::TActorId metricsQueueActor, const ::NMonitoring::TDynamicCounterPtr& counters, std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider @@ -112,12 +98,13 @@ public: , ProgramBuilder(programBuilder) , LogPrefix(TStringBuilder() << "TxId: " << TxId << ", TDqSolomonReadActor: ") , ReadParams(std::move(readParams)) - , MaxInflightDataRequests(maxInflightDataRequests) , ComputeActorBatchSize(computeActorBatchSize) + , MetricsQueueConsumersCountDelta(metricsQueueConsumersCountDelta) , MetricsQueueActor(metricsQueueActor) , CredentialsProvider(credentialsProvider) , SolomonClient(NSo::ISolomonAccessorClient::Make(ReadParams.Source, CredentialsProvider)) { + assert(MaxPointsPerOneMetric != 0); Y_UNUSED(counters); SOURCE_LOG_D("Init"); IngressStats.Level = statsLevel; @@ -141,26 +128,38 @@ public: } void Bootstrap() { - Become(&TDqSolomonReadActor::StateFunc); if (UseMetricsQueue) { + Become(&TDqSolomonReadActor::LimitlessModeState); MetricsQueueEvents.Init(TxId, SelfId(), SelfId()); MetricsQueueEvents.OnNewRecipientId(MetricsQueueActor); + + if (MetricsQueueConsumersCountDelta > 0) { + MetricsQueueEvents.Send(new TEvSolomonProvider::TEvUpdateConsumersCount(MetricsQueueConsumersCountDelta)); + } + RequestMetrics(); } else { + Become(&TDqSolomonReadActor::LimitedModeState); RequestData(); } } - STRICT_STFUNC(StateFunc, + STRICT_STFUNC(LimitlessModeState, hFunc(TEvSolomonProvider::TEvMetricsBatch, HandleMetricsBatch); hFunc(TEvSolomonProvider::TEvMetricsReadError, HandleMetricsReadError); + hFunc(TEvSolomonProvider::TEvPointsCountBatch, HandlePointsCountBatch); hFunc(TEvSolomonProvider::TEvNewDataBatch, HandleNewDataBatch); + hFunc(TEvSolomonProvider::TEvAck, Handle); hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle); hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle); hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle); hFunc(NActors::TEvents::TEvUndelivered, Handle); ) + STRICT_STFUNC(LimitedModeState, + hFunc(TEvSolomonProvider::TEvNewDataBatch, HandleNewDataBatchLimited); + ) + void HandleMetricsBatch(TEvSolomonProvider::TEvMetricsBatch::TPtr& metricsBatch) { if (!MetricsQueueEvents.OnEventReceived(metricsBatch)) { return; @@ -176,13 +175,16 @@ public: IsConfirmedMetricsQueueFinish = true; } - auto& metrics = batch.GetMetrics(); + auto& listedMetrics = batch.GetMetrics(); - SOURCE_LOG_D("HandleMetricsBatch batch of size " << metrics.size()); - Metrics.insert(Metrics.end(), metrics.begin(), metrics.end()); - ListedMetrics += metrics.size(); + SOURCE_LOG_D("HandleMetricsBatch batch of size " << listedMetrics.size()); + for (const auto& metric : listedMetrics) { + std::map<TString, TString> labels(metric.GetLabels().begin(), metric.GetLabels().end()); + ListedMetrics.emplace_back(std::move(labels), metric.GetType()); + } + ListedMetricsCount += listedMetrics.size(); - while (TryRequestData()) {} + while (TryRequestPointsCount()) {} if (LastMetricProcessed()) { NotifyComputeActorWithData(); @@ -202,32 +204,58 @@ public: } TIssues issues { TIssue(metricsReadError->Get()->Record.GetIssues()) }; - SOURCE_LOG_W("Got " << "error response[" << metricsReadError->Cookie << "] from solomon: " << issues.ToOneLineString()); + SOURCE_LOG_W("Got " << "error list metrics response[" << metricsReadError->Cookie << "] from solomon: " << issues.ToOneLineString()); Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); return; } + void HandlePointsCountBatch(TEvSolomonProvider::TEvPointsCountBatch::TPtr& pointsCountBatch) { + auto& batch = *pointsCountBatch->Get(); + + if (batch.Response.Status != NSo::EStatus::STATUS_OK) { + TIssues issues { TIssue(batch.Response.Error) }; + SOURCE_LOG_W("Got " << "error points count response[" << pointsCountBatch->Cookie << "] from solomon: " << issues.ToOneLineString()); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); + return; + } + + auto& metrics = batch.Metrics; + auto& pointsCount = batch.Response.Result.PointsCount; + ParsePointsCount(metrics, pointsCount); + + SOURCE_LOG_D("HandlePointsCountBatch batch of size " << metrics.size()); + TryRequestData(); + } + void HandleNewDataBatch(TEvSolomonProvider::TEvNewDataBatch::TPtr& newDataBatch) { auto& batch = *newDataBatch->Get(); - InflightDataRequests--; - if (!batch.Result.Success) { - TIssues issues { TIssue(batch.Result.ErrorMsg) }; - SOURCE_LOG_W("Got " << "error response[" << newDataBatch->Cookie << "] from solomon: " << issues.ToOneLineString()); + if (batch.Response.Status == NSo::EStatus::STATUS_FATAL_ERROR) { + TIssues issues { TIssue(batch.Response.Error) }; + SOURCE_LOG_W("Got " << "error data response[" << newDataBatch->Cookie << "] from solomon: " << issues.ToOneLineString()); Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); return; } + if (batch.Response.Status == NSo::EStatus::STATUS_RETRIABLE_ERROR) { + MetricsWithTimeRange.emplace_back(batch.Metric, batch.From, batch.To); + TryRequestData(); + return; + } - MetricsData.insert(MetricsData.end(), batch.Result.Result.begin(), batch.Result.Result.end()); - CompletedMetrics += batch.SelectorsCount; + MetricsData.insert(MetricsData.end(), batch.Response.Result.Timeseries.begin(), batch.Response.Result.Timeseries.end()); + CompletedMetricsCount++; - if (!Metrics.empty()) { - while (TryRequestData()) {} - } else if (MetricsData.size() >= ComputeActorBatchSize || LastMetricReceived()) { + if (!MetricsWithTimeRange.empty()) { + TryRequestData(); + } else if (MetricsData.size() >= ComputeActorBatchSize || LastMetricProcessed()) { NotifyComputeActorWithData(); } } + void Handle(TEvSolomonProvider::TEvAck::TPtr& ev) { + MetricsQueueEvents.OnEventReceived(ev); + } + void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) { SOURCE_LOG_D("Handle MetricsQueue retry"); MetricsQueueEvents.Retry(); @@ -251,13 +279,28 @@ public: } } - i64 GetAsyncInputData(TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { - Y_UNUSED(freeSpace); + void HandleNewDataBatchLimited(TEvSolomonProvider::TEvNewDataBatch::TPtr& newDataBatch) { + auto& batch = *newDataBatch->Get(); + + if (batch.Response.Status != NSo::EStatus::STATUS_OK) { + TIssues issues { TIssue(batch.Response.Error) }; + SOURCE_LOG_W("Got " << "error data response[" << newDataBatch->Cookie << "] from solomon: " << issues.ToOneLineString()); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); + return; + } + + MetricsData.insert(MetricsData.end(), batch.Response.Result.Timeseries.begin(), batch.Response.Result.Timeseries.end()); + CompletedMetricsCount++; + + NotifyComputeActorWithData(); + } + + i64 GetAsyncInputData(TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64) final { YQL_ENSURE(!buffer.IsWide(), "Wide stream is not supported"); - SOURCE_LOG_D("GetAsyncInputData sending " << MetricsData.size() << " metrics"); + SOURCE_LOG_D("GetAsyncInputData sending " << MetricsData.size() << " metrics, finished = " << LastMetricProcessed()); for (const auto& data : MetricsData) { - auto& labels = data.Labels; + auto& labels = data.Metric.Labels; auto dictValueBuilder = HolderFactory.NewDict(DictType, 0); for (auto& [key, value] : labels) { @@ -267,16 +310,12 @@ public: auto& timestamps = data.Timestamps; auto& values = data.Values; - auto& type = data.Type; + auto& type = data.Metric.Type; for (size_t i = 0; i < timestamps.size(); ++i){ NUdf::TUnboxedValue* items = nullptr; auto value = HolderFactory.CreateDirectArrayHolder(ReadParams.Source.GetSystemColumns().size() + ReadParams.Source.GetLabelNames().size(), items); - if (auto it = Index.find(SOLOMON_SCHEME_LABELS); it != Index.end()) { - items[it->second] = dictValue; - } - if (auto it = Index.find(SOLOMON_SCHEME_VALUE); it != Index.end()) { items[it->second] = NUdf::TUnboxedValuePod(values[i]); } @@ -290,6 +329,10 @@ public: items[it->second] = NUdf::TUnboxedValuePod((ui64)timestamps[i] / 1000); } + if (auto it = Index.find(SOLOMON_SCHEME_LABELS); it != Index.end()) { + items[it->second] = dictValue; + } + for (const auto& c : ReadParams.Source.GetLabelNames()) { auto& v = items[Index[c]]; auto it = labels.find(c); @@ -325,7 +368,10 @@ public: private: // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor - SOURCE_LOG_I("PassAway, processed " << CompletedMetrics << " metrics."); + SOURCE_LOG_I("PassAway, processed " << CompletedMetricsCount << " metrics."); + if (UseMetricsQueue) { + MetricsQueueEvents.Unsubscribe(); + } TActor<TDqSolomonReadActor>::PassAway(); } @@ -337,24 +383,15 @@ private: Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } - bool LastMetricReceived() const { - if (UseMetricsQueue) { - return IsConfirmedMetricsQueueFinish && CompletedMetrics == ListedMetrics; - } else { - return CompletedMetrics == 1; - } - } - bool LastMetricProcessed() const { if (UseMetricsQueue) { - return IsConfirmedMetricsQueueFinish && CompletedMetrics == ListedMetrics; - } else { - return MetricsData.empty() && CompletedMetrics == 1; + return IsMetricsQueueEmpty && CompletedMetricsCount == ListedMetricsCount; } + return CompletedMetricsCount == 1; } void TryRequestMetrics() { - if (Metrics.size() < MetricsPerDataQuery * MaxInflightDataRequests && !IsMetricsQueueEmpty && !IsWaitingMetricsQueueResponse) { + if (ListedMetrics.size() < 1000 && !IsMetricsQueueEmpty && !IsWaitingMetricsQueueResponse) { RequestMetrics(); } } @@ -364,55 +401,112 @@ private: IsWaitingMetricsQueueResponse = true; } - bool TryRequestData() { + bool TryRequestPointsCount() { TryRequestMetrics(); - if (Metrics.empty()) { + if (ListedMetrics.empty()) { return false; } - if (InflightDataRequests >= MaxInflightDataRequests) { - return false; + RequestPointsCount(); + return true; + } + + void RequestPointsCount() { + std::vector<NSo::TMetric> requestMetrics; + requestMetrics.reserve(std::min<ui64>(MetricsPerPointsCountQuery, ListedMetrics.size())); + while (!ListedMetrics.empty() && requestMetrics.size() < MetricsPerPointsCountQuery) { + requestMetrics.push_back(ListedMetrics.back()); + ListedMetrics.pop_back(); } - RequestData(); - return true; + auto getPointsCountFuture = SolomonClient->GetPointsCount(std::move(requestMetrics)); + + NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem(); + getPointsCountFuture.Subscribe([actorSystem, metrics = std::move(requestMetrics), selfId = SelfId()]( + const NThreading::TFuture<NSo::TGetPointsCountResponse>& response) mutable -> void + { + actorSystem->Send(selfId, new TEvSolomonProvider::TEvPointsCountBatch( + std::move(metrics), + response.GetValue()) + ); + }); + } + + void TryRequestData() { + TryRequestPointsCount(); + while (!MetricsWithTimeRange.empty()) { + RequestData(); + TryRequestPointsCount(); + } } void RequestData() { - std::vector<TString> dataSelectors; + NThreading::TFuture<NSo::TGetDataResponse> getDataFuture; + NSo::TMetric metric; + TInstant from; + TInstant to; + if (UseMetricsQueue) { - while (Metrics.size() > 0 && dataSelectors.size() < MetricsPerDataQuery) { - dataSelectors.push_back(BuildSelectorsString(Metrics.back())); - Metrics.pop_back(); - } + auto request = MetricsWithTimeRange.back(); + MetricsWithTimeRange.pop_back(); + + metric = request.Metric; + from = request.From; + to = request.To; + + getDataFuture = SolomonClient->GetData(metric, from, to); } else { - dataSelectors.push_back(ReadParams.Source.GetProgram()); + getDataFuture = SolomonClient->GetData( + ReadParams.Source.GetProgram(), + TInstant::Seconds(ReadParams.Source.GetFrom()), + TInstant::Seconds(ReadParams.Source.GetTo()) + ); } - auto getDataFuture = SolomonClient->GetData(dataSelectors); - InflightDataRequests++; - NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem(); - getDataFuture.Subscribe([actorSystem, selectorsCount = dataSelectors.size(), selfId = SelfId()]( - const NThreading::TFuture<NSo::ISolomonAccessorClient::TGetDataResult>& result) -> void + getDataFuture.Subscribe([actorSystem, metric, from, to, selfId = SelfId()]( + const NThreading::TFuture<NSo::TGetDataResponse>& response) -> void { - actorSystem->Send(selfId, new TEvSolomonProvider::TEvNewDataBatch(selectorsCount, result.GetValue())); + actorSystem->Send(selfId, new TEvSolomonProvider::TEvNewDataBatch( + metric, + from, + to, + response.GetValue()) + ); }); } - TString BuildSelectorsString(const NSo::MetricQueue::TMetricLabels& metric) const { - TStringBuilder result; - bool first = true; + void ParsePointsCount(const std::vector<NSo::TMetric>& metrics, const std::vector<ui64>& pointsCount) { + TInstant from = TInstant::Seconds(ReadParams.Source.GetFrom()); + TInstant to = TInstant::Seconds(ReadParams.Source.GetTo()); + + for (size_t i = 0; i < metrics.size(); ++i) { + auto& metric = metrics[i]; + auto& count = pointsCount[i]; - result << "{"; - for (const auto& [key, value] : metric.GetLabels()) { - if (!first) { - result << ","; + auto ranges = SplitTimeIntervalIntoRanges(from, to, count); + for (const auto& [fromRange, toRange] : ranges) { + MetricsWithTimeRange.emplace_back(metric, fromRange, toRange); } - first = false; - result << key << "=\"" << value << "\""; } - result << "}"; + } + + std::vector<std::pair<TInstant, TInstant>> SplitTimeIntervalIntoRanges(TInstant from, TInstant to, ui64 pointsCount) const { + std::vector<std::pair<TInstant, TInstant>> result; + if (pointsCount == 0) { + return result; + } + + result.reserve(pointsCount / MaxPointsPerOneMetric); + auto rangeDuration = to - from; + for (ui64 i = 0; i < pointsCount; i += MaxPointsPerOneMetric) { + double start = i; + double end = std::min(i + MaxPointsPerOneMetric, pointsCount); + result.emplace_back( + from + rangeDuration * start / pointsCount, + from + rangeDuration * end / pointsCount + ); + } return result; } @@ -426,8 +520,8 @@ private: NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder; const TString LogPrefix; const TDqSolomonReadParams ReadParams; - const ui64 MaxInflightDataRequests; const ui64 ComputeActorBatchSize; + const ui64 MetricsQueueConsumersCountDelta; bool UseMetricsQueue; TRetryEventsQueue MetricsQueueEvents; @@ -435,12 +529,14 @@ private: bool IsWaitingMetricsQueueResponse = false; bool IsMetricsQueueEmpty = false; bool IsConfirmedMetricsQueueFinish = false; - std::deque<NSo::MetricQueue::TMetricLabels> Metrics; + + std::deque<NSo::TMetric> ListedMetrics; + std::deque<TMetricTimeRange> MetricsWithTimeRange; std::deque<NSo::TTimeseries> MetricsData; - size_t InflightDataRequests = 0; - size_t ListedMetrics = 0; - size_t CompletedMetrics = 0; - const ui64 MetricsPerDataQuery = 15; + size_t ListedMetricsCount = 0; + size_t CompletedMetricsCount = 0; + const ui64 MetricsPerPointsCountQuery = 50; + const ui64 MaxPointsPerOneMetric = 1000000; TString SourceId; std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider; @@ -456,6 +552,7 @@ private: std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolomonReadActor( NYql::NSo::NProto::TDqSolomonSource&& source, ui64 inputIndex, + ui64 metricsQueueConsumersCountDelta, TCollectStatsLevel statsLevel, const TTxId& txId, const NActors::TActorId& computeActorId, @@ -482,11 +579,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom metricsQueueActor = ActorIdFromProto(protoId); } - ui64 maxInflightDataRequests = 1; - if (auto it = settings.find("maxInflightDataRequests"); it != settings.end()) { - maxInflightDataRequests = FromString<ui64>(it->second); - } - ui64 computeActorBatchSize = 1; if (auto it = settings.find("computeActorBatchSize"); it != settings.end()) { computeActorBatchSize = FromString<ui64>(it->second); @@ -503,8 +595,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom holderFactory, programBuilder, std::move(params), - maxInflightDataRequests, computeActorBatchSize, + metricsQueueConsumersCountDelta, metricsQueueActor, counters, credentialsProvider); @@ -519,9 +611,15 @@ void RegisterDQSolomonReadActorFactory(TDqAsyncIoFactory& factory, ISecuredServi { auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(); + ui64 metricsQueueConsumersCountDelta = 0; + if (args.ReadRanges.size() > 1) { + metricsQueueConsumersCountDelta = args.ReadRanges.size() - 1; + } + return CreateDqSolomonReadActor( std::move(settings), args.InputIndex, + metricsQueueConsumersCountDelta, args.StatsLevel, args.TxId, args.ComputeActorId, diff --git a/ydb/library/yql/providers/solomon/events/events.h b/ydb/library/yql/providers/solomon/events/events.h index 3fd23f5ae97..180494ac211 100644 --- a/ydb/library/yql/providers/solomon/events/events.h +++ b/ydb/library/yql/providers/solomon/events/events.h @@ -12,16 +12,36 @@ struct TEvSolomonProvider { EvBegin = EventSpaceBegin(NKikimr::TKikimrEvents::ES_SOLOMON_PROVIDER), // lister events - EvGetNextBatch = EvBegin, + EvUpdateConsumersCount = EvBegin, + EvAck, + EvGetNextBatch, EvMetricsBatch, EvMetricsReadError, // read actor events + EvPointsCountBatch, EvNewDataBatch, EvEnd }; - static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_SOLOMON_PROVIDER), "expect EvEnd < EventSpaceEnd(TEvents::ES_S3_PROVIDER)"); + static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_SOLOMON_PROVIDER), "expect EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_SOLOMON_PROVIDER)"); + + struct TEvUpdateConsumersCount : + public NActors::TEventPB<TEvUpdateConsumersCount, NSo::MetricQueue::TEvUpdateConsumersCount, EvUpdateConsumersCount> { + + explicit TEvUpdateConsumersCount(ui64 consumersCountDelta = 0) { + Record.SetConsumersCountDelta(consumersCountDelta); + } + }; + + struct TEvAck : + public NActors::TEventPB<TEvAck, NSo::MetricQueue::TEvAck, EvAck> { + + TEvAck() = default; + explicit TEvAck(const NDqProto::TMessageTransportMeta& transportMeta) { + *Record.MutableTransportMeta() = transportMeta; + } + }; struct TEvGetNextBatch : public NActors::TEventPB<TEvGetNextBatch, NSo::MetricQueue::TEvGetNextBatch, EvGetNextBatch> { @@ -31,7 +51,7 @@ struct TEvSolomonProvider { public NActors::TEventPB<TEvMetricsBatch, NSo::MetricQueue::TEvMetricsBatch, EvMetricsBatch> { TEvMetricsBatch() = default; - TEvMetricsBatch(std::vector<NSo::MetricQueue::TMetricLabels> metrics, bool noMoreMetrics, const NDqProto::TMessageTransportMeta& transportMeta) { + TEvMetricsBatch(std::vector<NSo::MetricQueue::TMetric> metrics, bool noMoreMetrics, const NDqProto::TMessageTransportMeta& transportMeta) { Record.MutableMetrics()->Assign( metrics.begin(), metrics.end()); @@ -50,12 +70,24 @@ struct TEvSolomonProvider { } }; + struct TEvPointsCountBatch : public NActors::TEventLocal<TEvPointsCountBatch, EvPointsCountBatch> { + std::vector<NSo::TMetric> Metrics; + NSo::TGetPointsCountResponse Response; + TEvPointsCountBatch(std::vector<NSo::TMetric>&& metrics, const NSo::TGetPointsCountResponse& response) + : Metrics(std::move(metrics)) + , Response(response) + {} + }; + struct TEvNewDataBatch: public NActors::TEventLocal<TEvNewDataBatch, EvNewDataBatch> { - ui64 SelectorsCount; - NSo::ISolomonAccessorClient::TGetDataResult Result; - TEvNewDataBatch(ui64 selectorsCount, NSo::ISolomonAccessorClient::TGetDataResult result) - : SelectorsCount(selectorsCount) - , Result(std::move(result)) + NSo::TMetric Metric; + TInstant From, To; + NSo::TGetDataResponse Response; + TEvNewDataBatch(NSo::TMetric metric, TInstant from, TInstant to, const NSo::TGetDataResponse& response) + : Metric(metric) + , From(from) + , To(to) + , Response(response) {} }; }; diff --git a/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json b/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json index 5c55f4d359c..8ebb3e051fd 100644 --- a/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json +++ b/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json @@ -61,7 +61,8 @@ {"Index": 10, "Name": "DownsamplingDisabled", "Type": "TCoBool"}, {"Index": 11, "Name": "DownsamplingAggregation", "Type": "TCoAtom"}, {"Index": 12, "Name": "DownsamplingFill", "Type": "TCoAtom"}, - {"Index": 13, "Name": "DownsamplingGridSec", "Type": "TCoUint32"} + {"Index": 13, "Name": "DownsamplingGridSec", "Type": "TCoUint32"}, + {"Index": 14, "Name": "RequiredLabelNames", "Type": "TCoAtomList"} ] }, { @@ -83,8 +84,9 @@ {"Index": 2, "Name": "Object", "Type": "TSoObject"}, {"Index": 3, "Name": "SystemColumns", "Type": "TCoAtomList"}, {"Index": 4, "Name": "LabelNames", "Type": "TCoAtomList"}, - {"Index": 5, "Name": "RowType", "Type": "TExprBase"}, - {"Index": 6, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true} + {"Index": 5, "Name": "RequiredLabelNames", "Type": "TCoAtomList"}, + {"Index": 6, "Name": "RowType", "Type": "TExprBase"}, + {"Index": 7, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true} ] }, { diff --git a/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto b/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto index e8ceda42c38..7d7393ca063 100644 --- a/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto +++ b/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto @@ -66,4 +66,5 @@ message TDqSolomonSource { repeated string SystemColumns = 12; repeated string LabelNames = 13; map<string, string> Settings = 14; + repeated string RequiredLabelNames = 15; } diff --git a/ydb/library/yql/providers/solomon/proto/metrics_queue.proto b/ydb/library/yql/providers/solomon/proto/metrics_queue.proto index 289630e59c7..08c9b68646c 100644 --- a/ydb/library/yql/providers/solomon/proto/metrics_queue.proto +++ b/ydb/library/yql/providers/solomon/proto/metrics_queue.proto @@ -4,13 +4,23 @@ package NYql.NSo.MetricQueue; import "ydb/library/yql/dq/actors/protos/dq_events.proto"; +message TEvUpdateConsumersCount { + uint64 ConsumersCountDelta = 1; + + optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100; +} + +message TEvAck { + optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100; +} + message TEvGetNextBatch { optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100; } message TEvMetricsBatch { bool NoMoreMetrics = 1; - repeated TMetricLabels Metrics = 2; + repeated TMetric Metrics = 2; optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100; } @@ -21,7 +31,7 @@ message TEvMetricsReadError { optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100; } -message TMetricLabels { - string Type = 1; - map<string, string> Labels = 2; +message TMetric { + map<string, string> Labels = 1; + string Type = 2; } diff --git a/ydb/library/yql/providers/solomon/provider/ya.make b/ydb/library/yql/providers/solomon/provider/ya.make index 52fcdb10392..016593b68c1 100644 --- a/ydb/library/yql/providers/solomon/provider/ya.make +++ b/ydb/library/yql/providers/solomon/provider/ya.make @@ -26,6 +26,7 @@ PEERDIR( ydb/library/yql/providers/solomon/expr_nodes ydb/library/yql/providers/solomon/proto ydb/library/yql/providers/solomon/scheme + ydb/library/yql/providers/solomon/solomon_accessor/client ydb/public/sdk/cpp/src/client/types/credentials yql/essentials/core/dq_integration yql/essentials/providers/common/config diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp index 3cc884e845b..3cb7e646688 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp @@ -11,7 +11,6 @@ TSolomonConfiguration::TSolomonConfiguration() REGISTER_SETTING(*this, MetricsQueuePrefetchSize); REGISTER_SETTING(*this, MetricsQueueBatchCountLimit); REGISTER_SETTING(*this, SolomonClientDefaultReplica); - REGISTER_SETTING(*this, MaxInflightDataRequests); REGISTER_SETTING(*this, ComputeActorBatchSize); } diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h index 7fbc83f2469..fa644983900 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h @@ -15,7 +15,6 @@ struct TSolomonSettings { NCommon::TConfSetting<ui64, false> MetricsQueuePrefetchSize; NCommon::TConfSetting<ui64, false> MetricsQueueBatchCountLimit; NCommon::TConfSetting<TString, false> SolomonClientDefaultReplica; - NCommon::TConfSetting<ui64, false> MaxInflightDataRequests; NCommon::TConfSetting<ui64, false> ComputeActorBatchSize; }; diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp index 0c4d3956bdb..b4383ec6504 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp @@ -34,7 +34,7 @@ public: } TStatus HandleSoSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) { - if (!EnsureArgsCount(*input, 14, ctx)) { + if (!EnsureArgsCount(*input, 15, ctx)) { return TStatus::Error; } @@ -66,6 +66,11 @@ public: if (!EnsureTupleOfAtoms(labelNames, ctx)) { return TStatus::Error; } + + auto& requiredLabelNames = *input->Child(TSoSourceSettings::idx_RequiredLabelNames); + if (!EnsureTupleOfAtoms(requiredLabelNames, ctx)) { + return TStatus::Error; + } auto& from = *input->Child(TSoSourceSettings::idx_From); if (!EnsureAtom(from, ctx) || !ValidateDatetimeFormat("from", from, ctx)) { diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp index 5960203606b..afe43910ee9 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp @@ -13,6 +13,7 @@ #include <ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.h> #include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h> #include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h> +#include <ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h> #include <util/string/builder.h> @@ -82,9 +83,20 @@ public: { } - ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings&) override { - Y_UNUSED(node); - partitions.push_back("zz_partition"); + ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings& settings) override { + const TDqSource dqSource(&node); + + if (const auto maybeSettings = dqSource.Settings().Maybe<TSoSourceSettings>()) { + const auto soSourceSettings = maybeSettings.Cast(); + if (!soSourceSettings.Selectors().StringValue().empty()) { + for (size_t i = 0; i < settings.MaxPartitions; ++i) { + partitions.push_back(TStringBuilder() << "partition" << i); + } + return 0; + } + } + + partitions.push_back("partition"); return 0; } @@ -107,20 +119,18 @@ public: const auto& clusterName = soReadObject.DataSource().Cluster().StringValue(); const auto token = "cluster:default_" + clusterName; - YQL_CLOG(INFO, ProviderS3) << "Wrap " << read->Content() << " with token: " << token; + YQL_CLOG(INFO, ProviderSolomon) << "Wrap " << read->Content() << " with token: " << token; auto settings = soReadObject.Object().Settings(); auto& settingsRef = settings.Ref(); - const auto now = TInstant::Now(); - const auto now1h = now - TDuration::Hours(1); - TString from = now1h.ToStringUpToSeconds(); - TString to = now.ToStringUpToSeconds(); + TInstant from = TInstant::Now() - TDuration::Hours(1); + TInstant to = TInstant::Now(); TString program; TString selectors; - bool downsamplingDisabled = false; - TString downsamplingAggregation = "AVG"; - TString downsamplingFill = "PREVIOUS"; - ui32 downsamplingGridSec = 15; + std::optional<bool> downsamplingDisabled; + std::optional<TString> downsamplingAggregation; + std::optional<TString> downsamplingFill; + std::optional<ui32> downsamplingGridSec; for (auto i = 0U; i < settingsRef.ChildrenSize(); ++i) { if (settingsRef.Child(i)->Head().IsAtom("from"sv)) { @@ -128,8 +138,10 @@ public: if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), settingsRef.Child(i)->Head().Content(), ctx, value)) { return {}; } - - from = value; + if (!TInstant::TryParseIso8601(value, from)) { + ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), "couldn't parse `from`, use Iso8601 format, e.g. 2025-03-12T14:40:39Z")); + return {}; + } continue; } if (settingsRef.Child(i)->Head().IsAtom("to"sv)) { @@ -137,8 +149,10 @@ public: if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), settingsRef.Child(i)->Head().Content(), ctx, value)) { return {}; } - - to = value; + if (!TInstant::TryParseIso8601(value, to)) { + ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), "couldn't parse `to`, use Iso8601 format, e.g. 2025-03-12T14:40:39Z")); + return {}; + } continue; } if (settingsRef.Child(i)->Head().IsAtom("program"sv)) { @@ -164,10 +178,13 @@ public: if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), settingsRef.Child(i)->Head().Content(), ctx, value)) { return {}; } - if (!TryFromString<bool>(value, downsamplingDisabled)) { + bool boolValue; + if (!TryFromString<bool>(value, boolValue)) { ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), TStringBuilder() << "downsampling.disabled must be true or false, but has " << value)); return {}; } + + downsamplingDisabled = boolValue; continue; } if (settingsRef.Child(i)->Head().IsAtom("downsampling.aggregation"sv)) { @@ -212,6 +229,24 @@ public: return {}; } + if (downsamplingDisabled.has_value() && *downsamplingDisabled) { + if (downsamplingAggregation || downsamplingFill || downsamplingGridSec) { + ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Pos()), "downsampling.disabled must be false if downsampling.aggregation, downsampling.fill or downsamplig.grid_interval is specified")); + return {}; + } + } else { + downsamplingDisabled = false; + if (!downsamplingAggregation) { + downsamplingAggregation = "AVG"; + } + if (!downsamplingFill) { + downsamplingFill = "PREVIOUS"; + } + if (!downsamplingGridSec) { + downsamplingGridSec = 15; + } + } + return Build<TDqSourceWrap>(ctx, read->Pos()) .Input<TSoSourceSettings>() .World(soReadObject.World()) @@ -222,14 +257,15 @@ public: .RowType(soReadObject.RowType()) .SystemColumns(soReadObject.SystemColumns()) .LabelNames(soReadObject.LabelNames()) - .From<TCoAtom>().Build(from) - .To<TCoAtom>().Build(to) + .RequiredLabelNames(soReadObject.RequiredLabelNames()) + .From<TCoAtom>().Build(from.ToStringUpToSeconds()) + .To<TCoAtom>().Build(to.ToStringUpToSeconds()) .Selectors<TCoAtom>().Build(selectors) .Program<TCoAtom>().Build(program) - .DownsamplingDisabled<TCoBool>().Literal().Build(downsamplingDisabled ? "true" : "false").Build() - .DownsamplingAggregation<TCoAtom>().Build(downsamplingAggregation) - .DownsamplingFill<TCoAtom>().Build(downsamplingFill) - .DownsamplingGridSec<TCoUint32>().Literal().Build(ToString(downsamplingGridSec)).Build() + .DownsamplingDisabled<TCoBool>().Literal().Build(*downsamplingDisabled ? "true" : "false").Build() + .DownsamplingAggregation<TCoAtom>().Build(downsamplingAggregation ? *downsamplingAggregation : "") + .DownsamplingFill<TCoAtom>().Build(downsamplingFill ? *downsamplingFill : "") + .DownsamplingGridSec<TCoUint32>().Literal().Build(ToString(downsamplingGridSec ? *downsamplingGridSec : 0)).Build() .Build() .DataSource(soReadObject.DataSource().Cast<TCoDataSource>()) .RowType(soReadObject.RowType()) @@ -243,7 +279,7 @@ public: return TSoWrite::Match(&write); } - void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t, TExprContext&) override { + void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t maxTasksPerStage, TExprContext&) override { const TDqSource dqSource(&node); const auto maybeSettings = dqSource.Settings().Maybe<TSoSourceSettings>(); if (!maybeSettings) { @@ -304,35 +340,37 @@ public: source.AddLabelNames(columnAsString); } + for (const auto& c : settings.RequiredLabelNames()) { + const auto& labelAsString = c.StringValue(); + source.AddRequiredLabelNames(labelAsString); + } + auto& solomonSettings = State_->Configuration; - auto metricsQueuePageSize = solomonSettings->MetricsQueuePageSize.Get().OrElse(2000); + auto metricsQueuePageSize = solomonSettings->MetricsQueuePageSize.Get().OrElse(5000); sourceSettings.insert({"metricsQueuePageSize", ToString(metricsQueuePageSize)}); - auto metricsQueuePrefetchSize = solomonSettings->MetricsQueuePrefetchSize.Get().OrElse(2000); + auto metricsQueuePrefetchSize = solomonSettings->MetricsQueuePrefetchSize.Get().OrElse(10000); sourceSettings.insert({"metricsQueuePrefetchSize", ToString(metricsQueuePrefetchSize)}); - auto metricsQueueBatchCountLimit = solomonSettings->MetricsQueueBatchCountLimit.Get().OrElse(1000); + auto metricsQueueBatchCountLimit = solomonSettings->MetricsQueueBatchCountLimit.Get().OrElse(250); sourceSettings.insert({"metricsQueueBatchCountLimit", ToString(metricsQueueBatchCountLimit)}); auto solomonClientDefaultReplica = solomonSettings->SolomonClientDefaultReplica.Get().OrElse("sas"); sourceSettings.insert({"solomonClientDefaultReplica", ToString(solomonClientDefaultReplica)}); - auto maxInflightDataRequests = solomonSettings->MaxInflightDataRequests.Get().OrElse(100); - sourceSettings.insert({"maxInflightDataRequests", ToString(maxInflightDataRequests)}); - - auto computeActorBatchSize = solomonSettings->ComputeActorBatchSize.Get().OrElse(10000); + auto computeActorBatchSize = solomonSettings->ComputeActorBatchSize.Get().OrElse(1000); sourceSettings.insert({"computeActorBatchSize", ToString(computeActorBatchSize)}); if (!selectors.empty()) { - NDq::TDqSolomonReadParams readParams{ .Source = source }; - auto providerFactory = CreateCredentialsProviderFactoryForStructuredToken(State_->CredentialsFactory, State_->Configuration->Tokens.at(cluster)); auto credentialsProvider = providerFactory->CreateProvider(); + + NDq::TDqSolomonReadParams readParams{ .Source = source }; auto metricsQueueActor = NActors::TActivationContext::ActorSystem()->Register( NDq::CreateSolomonMetricsQueueActor( - 1, + maxTasksPerStage, readParams, credentialsProvider ), diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp index a1ea4c46062..0a6a3c2ac2b 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp @@ -166,6 +166,7 @@ public: .Object(soObject) .SystemColumns(systemColumnsNode) .LabelNames(labelNamesNode) + .RequiredLabelNames().Build() .RowType(rowTypeNode) .ColumnOrder(std::move(userSchema.back())) .Done().Ptr() @@ -175,6 +176,7 @@ public: .Object(soObject) .SystemColumns(systemColumnsNode) .LabelNames(labelNamesNode) + .RequiredLabelNames().Build() .RowType(rowTypeNode) .Done().Ptr(); } diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp index 712877a6f23..67982a34eb9 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp @@ -1,9 +1,40 @@ #include "yql_solomon_provider_impl.h" +#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> +#include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h> +#include <ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h> +#include <yql/essentials/core/yql_expr_optimize.h> +#include <yql/essentials/providers/common/provider/yql_provider_names.h> + namespace NYql { using namespace NNodes; +namespace { + +TMaybe<TString> ExtractSetting(const TExprNode& settings, const TString& settingName) { + for (size_t i = 0U; i < settings.ChildrenSize(); ++i) { + if (settings.Child(i)->Head().IsAtom(settingName)) { + return TString(settings.Child(i)->Tail().Content()); + } + } + + return {}; +} + +NSo::NProto::ESolomonClusterType MapClusterType(TSolomonClusterConfig::ESolomonClusterType clusterType) { + switch (clusterType) { + case TSolomonClusterConfig::SCT_SOLOMON: + return NSo::NProto::ESolomonClusterType::CT_SOLOMON; + case TSolomonClusterConfig::SCT_MONITORING: + return NSo::NProto::ESolomonClusterType::CT_MONITORING; + default: + YQL_ENSURE(false, "Invalid cluster type " << ToString<ui32>(clusterType)); + } +} + +} // namespace + class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase { public: TSolomonLoadTableMetadataTransformer(TSolomonState::TPtr state) @@ -12,25 +43,96 @@ public: } TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - Y_UNUSED(input); - Y_UNUSED(output); - + output = input; if (ctx.Step.IsDone(TExprStep::LoadTablesMetadata)) { return TStatus::Ok; } - return TStatus::Ok; + auto nodes = FindNodes(input, [&](const TExprNode::TPtr& node) -> bool { + if (const auto maybeRead = TMaybeNode<TSoReadObject>(node)) { + TSoReadObject read(node); + auto& settings = read.Object().Settings().Ref(); + if (read.RequiredLabelNames().Empty() && ExtractSetting(settings, "selectors")) { + return true; + } + } + return false; + }); + + std::vector<NThreading::TFuture<NSo::TGetLabelsResponse>> futures; + futures.reserve(nodes.size()); + for (const auto& n : nodes) { + TSoReadObject soReadObject(n); + + const auto& clusterName = soReadObject.DataSource().Cluster().StringValue(); + const auto* clusterDesc = State_->Configuration->ClusterConfigs.FindPtr(clusterName); + auto& settings = soReadObject.Object().Settings().Ref(); + + if (auto maybeSelectors = ExtractSetting(settings, "selectors")) { + NSo::NProto::TDqSolomonSource source; + source.SetEndpoint(clusterDesc->GetCluster()); + source.SetProject(soReadObject.Object().Project().StringValue()); + source.SetClusterType(MapClusterType(clusterDesc->GetClusterType())); + source.SetUseSsl(clusterDesc->GetUseSsl()); + + auto defaultReplica = State_->Configuration->SolomonClientDefaultReplica.Get().OrElse("sas"); + source.MutableSettings()->insert({ "solomonClientDefaultReplica", ToString(defaultReplica) }); + + auto providerFactory = CreateCredentialsProviderFactoryForStructuredToken(State_->CredentialsFactory, State_->Configuration->Tokens.at(clusterName)); + auto credentialsProvider = providerFactory->CreateProvider(); + + SolomonClient_ = NSo::ISolomonAccessorClient::Make(std::move(source), credentialsProvider); + auto future = SolomonClient_->GetLabelNames(*maybeSelectors); + + LabelNamesRequests_[soReadObject.Raw()] = future; + futures.push_back(future); + } + } + + if (futures.empty()) { + return TStatus::Ok; + } + + AllFuture_ = NThreading::WaitExceptionOrAll(futures); + return TStatus::Async; } - NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final { - Y_UNUSED(input); - return AsyncFuture_; + NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode&) final { + return AllFuture_; } - TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext&) final { - output = input; + TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { + AllFuture_.GetValue(); + + TNodeMap<NThreading::TFuture<NSo::TGetLabelsResponse>> labelNamesRequests; + labelNamesRequests.swap(LabelNamesRequests_); + + TNodeOnNodeOwnedMap replaces; + for (auto& [node, request] : labelNamesRequests) { + auto value = request.GetValue(); + + if (value.Status != NSo::EStatus::STATUS_OK) { + ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), + TStringBuilder() << "Failed to get label names, details: " << value.Error)); + return TStatus::Error; + } - return TStatus::Ok; + TSoReadObject read(node); + TVector<TCoAtom> labelNames; + for (const auto& label : value.Result.Labels) { + labelNames.push_back(Build<TCoAtom>(ctx, read.Pos()).Value(label).Done()); + } + + replaces.emplace(node, + Build<TSoReadObject>(ctx, read.Pos()) + .InitFrom(read) + .RequiredLabelNames() + .Add(labelNames) + .Build() + .Done().Ptr()); + } + + return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings{nullptr}); } void Rewind() final { @@ -38,7 +140,10 @@ public: private: TSolomonState::TPtr State_; - NThreading::TFuture<void> AsyncFuture_; + NThreading::TFuture<void> AllFuture_; + + NSo::ISolomonAccessorClient::TPtr SolomonClient_; + TNodeMap<NThreading::TFuture<NSo::TGetLabelsResponse>> LabelNamesRequests_; }; THolder<IGraphTransformer> CreateSolomonLoadTableMetadataTransformer(TSolomonState::TPtr state) { diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_provider_impl.h b/ydb/library/yql/providers/solomon/provider/yql_solomon_provider_impl.h index 1d1753aba6b..df477bdbf38 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_provider_impl.h +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_provider_impl.h @@ -2,6 +2,7 @@ #include "yql_solomon_provider.h" +#include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h> #include <yql/essentials/core/yql_graph_transformer.h> #include <yql/essentials/providers/common/transform/yql_exec.h> #include <yql/essentials/providers/common/transform/yql_visit.h> diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.cpp b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.cpp index 8c774ec4a8e..3dc8ad9a36c 100644 --- a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.cpp +++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.cpp @@ -1,13 +1,20 @@ #include "solomon_accessor_client.h" +#include <library/cpp/json/writer/json.h> #include <library/cpp/protobuf/interop/cast.h> +#include <library/cpp/threading/future/wait/wait.h> +#include <util/string/join.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/public/sdk/cpp/src/library/grpc/client/grpc_client_low.h> #include <yql/essentials/utils/url_builder.h> #include <yql/essentials/utils/yql_panic.h> -#include <ydb/library/yql/providers/solomon/solomon_accessor/grpc/solomon_accessor_pb.pb.h> -#include <ydb/library/yql/providers/solomon/solomon_accessor/grpc/solomon_accessor_pb.grpc.pb.h> +#include <ydb/library/yql/providers/solomon/solomon_accessor/grpc/data_service.pb.h> +#include <ydb/library/yql/providers/solomon/solomon_accessor/grpc/data_service.grpc.pb.h> + +#include <util/string/join.h> +#include <util/string/strip.h> +#include <util/string/split.h> namespace NYql::NSo { @@ -15,8 +22,7 @@ using namespace yandex::monitoring::api::v3; namespace { -Downsampling::GapFilling ParseGapFilling(const TString& fill) -{ +Downsampling::GapFilling ParseGapFilling(const TString& fill) { if (fill == "NULL"sv) { return Downsampling::GAP_FILLING_NULL; } @@ -29,8 +35,7 @@ Downsampling::GapFilling ParseGapFilling(const TString& fill) return Downsampling::GAP_FILLING_UNSPECIFIED; } -Downsampling::GridAggregation ParseGridAggregation(const TString& aggregation) -{ +Downsampling::GridAggregation ParseGridAggregation(const TString& aggregation) { if (aggregation == "MAX"sv) { return Downsampling::GRID_AGGREGATION_MAX; } @@ -52,8 +57,7 @@ Downsampling::GridAggregation ParseGridAggregation(const TString& aggregation) return Downsampling::GRID_AGGREGATION_UNSPECIFIED; } -TString MetricTypeToString(MetricType type) -{ +TString MetricTypeToString(MetricType type) { switch (type) { case MetricType::DGAUGE: return "DGAUGE"; @@ -68,106 +72,283 @@ TString MetricTypeToString(MetricType type) } } -class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable_shared_from_this<TSolomonAccessorClient> -{ +std::vector<TString> ParseKnownLabelNames(const TString& selectors) { + auto selectorValues = StringSplitter(selectors.substr(1, selectors.size() - 2)).Split(',').SkipEmpty().ToList<TString>(); + std::vector<TString> result; + result.reserve(selectorValues.size()); + for (const auto& value : selectorValues) { + result.push_back(StripString(value.substr(0, value.find('=')))); + } + return result; +} + +TGetLabelsResponse ProcessGetLabelsResponse(NYql::IHTTPGateway::TResult&& response, std::vector<TString>&& knownLabelNames) { + TGetLabelsResult result; + + if (response.CurlResponseCode != CURLE_OK) { + return TGetLabelsResponse(TStringBuilder{} << "Error while sending list metric names request to monitoring api: " << response.Issues.ToOneLineString()); + } + + if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) { + return TGetLabelsResponse(TStringBuilder{} << "Error while sending list metric names request to monitoring api: " << response.Content.data()); + } + + NJson::TJsonValue json; + try { + NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true); + } catch (const std::exception& e) { + return TGetLabelsResponse(TStringBuilder{} << "Failed to parse response from monitoring api: " << e.what()); + } + + if (!json.IsMap() || !json.Has("names") || !json["names"].IsArray()) { + return TGetLabelsResponse("Invalid result from monitoring api"); + } + + const auto names = json["names"].GetArray(); + + for (const auto& name : names) { + if (!name.IsString()) { + return TGetLabelsResponse("Invalid label names from monitoring api"); + } else { + result.Labels.push_back(name.GetString()); + } + } + result.Labels.insert(result.Labels.end(), knownLabelNames.begin(), knownLabelNames.end()); + + return TGetLabelsResponse(std::move(result)); +} + +TListMetricsResponse ProcessListMetricsResponse(NYql::IHTTPGateway::TResult&& response) { + TListMetricsResult result; + + if (response.CurlResponseCode != CURLE_OK) { + return TListMetricsResponse(TStringBuilder{} << "Error while sending list metrics request to monitoring api: " << response.Issues.ToOneLineString()); + } + + if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) { + return TListMetricsResponse(TStringBuilder{} << "Error while sending list metrics request to monitoring api: " << response.Content.data()); + } + + NJson::TJsonValue json; + try { + NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true); + } catch (const std::exception& e) { + return TListMetricsResponse(TStringBuilder{} << "Failed to parse response from monitoring api: " << e.what()); + } + + if (!json.IsMap() || !json.Has("result") || !json.Has("page")) { + return TListMetricsResponse("Invalid list metrics result from monitoring api"); + } + + const auto pagesInfo = json["page"]; + if (!pagesInfo.IsMap() || !pagesInfo.Has("pagesCount") || !pagesInfo["pagesCount"].IsInteger()) { + return TListMetricsResponse("Invalid paging info from monitoring api"); + } + + result.PagesCount = pagesInfo["pagesCount"].GetInteger(); + + for (const auto& metricObj : json["result"].GetArray()) { + if (!metricObj.IsMap() || !metricObj.Has("labels") || !metricObj["labels"].IsMap() || !metricObj.Has("type") || !metricObj["type"].IsString()) { + return TListMetricsResponse("Invalid list metrics result from monitoring api"); + } + + std::map<TString, TString> metricLabels; + for (const auto& [key, value] : metricObj["labels"].GetMap()) { + metricLabels[key] = value.GetString(); + } + + result.Metrics.emplace_back(std::move(metricLabels), metricObj["type"].GetString()); + } + + return TListMetricsResponse(std::move(result)); +} + +TGetPointsCountResponse ProcessGetPointsCountResponse(NYql::IHTTPGateway::TResult&& response, size_t expectedSize) { + TGetPointsCountResult result; + + if (response.CurlResponseCode != CURLE_OK) { + return TGetPointsCountResponse(TStringBuilder() << "Error while sending points count request to monitoring api: " << response.Issues.ToOneLineString()); + } + + if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) { + return TGetPointsCountResponse(TStringBuilder{} << "Error while sending points count request to monitoring api: " << response.Content.data()); + } + + NJson::TJsonValue json; + try { + NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true); + } catch (const std::exception& e) { + return TGetPointsCountResponse(TStringBuilder{} << "Failed to parse points count response from monitoring api: " << e.what()); + } + + if (!json.Has("vector") || !json["vector"].IsArray()) { + return TGetPointsCountResponse("Invalid points count result from monitoring api"); + } + + const auto counts = json["vector"].GetArray(); + if (counts.size() != expectedSize) { + return TGetPointsCountResponse("Invalid points count response size from monitoring api"); + } + + for (size_t i = 0; i < counts.size(); ++i) { + if (!counts[i].IsMap() || !counts[i].Has("scalar") || !counts[i].GetMap().at("scalar").IsInteger()) { + return TGetPointsCountResponse("Invalid points count response format from monitoring api"); + } + result.PointsCount.push_back(counts[i].GetMap().at("scalar").GetInteger()); + } + + return TGetPointsCountResponse(std::move(result)); +} + +TGetDataResponse ProcessGetDataResponse(NYdbGrpc::TGrpcStatus&& status, ReadResponse&& response) { + TGetDataResult result; + + if (!status.Ok()) { + if (status.GRpcStatusCode == grpc::StatusCode::RESOURCE_EXHAUSTED) { + return TGetDataResponse(); + } + return TGetDataResponse(TStringBuilder{} << "Error while sending data request to monitoring api: " << status.Msg); + } + + if (response.response_per_query_size() != 1) { + return TGetDataResponse("Invalid get data repsonse size from monitoring api"); + } + + const auto& responseValue = response.response_per_query()[0]; + YQL_ENSURE(responseValue.has_timeseries_vector()); + for (const auto& queryResponse : responseValue.timeseries_vector().values()) { + auto type = MetricTypeToString(queryResponse.type()); + + std::map<TString, TString> labels(queryResponse.labels().begin(), queryResponse.labels().end()); + std::vector<int64_t> timestamps(queryResponse.timestamp_values().values().begin(), queryResponse.timestamp_values().values().end()); + std::vector<double> values(queryResponse.double_values().values().begin(), queryResponse.double_values().values().end()); + + TMetric metric { + .Labels = labels, + .Type = type, + }; + + result.Timeseries.emplace_back(std::move(metric), std::move(timestamps), std::move(values)); + } + + return TGetDataResponse(std::move(result)); +} + +class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable_shared_from_this<TSolomonAccessorClient> { public: TSolomonAccessorClient( const TString& defaultReplica, const ui64 defaultGrpcPort, - const NYql::NSo::NProto::TDqSolomonSource& settings, - std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider - ) + NYql::NSo::NProto::TDqSolomonSource&& settings, + std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider) : DefaultReplica(defaultReplica) , DefaultGrpcPort(defaultGrpcPort) - , Settings(settings) - , CredentialsProvider(credentialsProvider) - , HttpGateway(IHTTPGateway::Make()) - , GrpcClient(std::make_shared<NYdbGrpc::TGRpcClientLow>()) - { + , Settings(std::move(settings)) + , CredentialsProvider(credentialsProvider) { + + HttpConfig.SetMaxInFlightCount(HttpMaxInflight); + HttpGateway = IHTTPGateway::Make(&HttpConfig); + GrpcConfig.Locator = GetGrpcSolomonEndpoint(); GrpcConfig.EnableSsl = Settings.GetUseSsl(); + GrpcConfig.MaxInFlight = GrpcMaxInflight; + GrpcClient = std::make_shared<NYdbGrpc::TGRpcClientLow>(); + GrpcConnection = GrpcClient->CreateGRpcServiceConnection<DataService>(GrpcConfig); + } + + ~TSolomonAccessorClient() override { + GrpcClient->Stop(); } public: - NThreading::TFuture<TListMetricsResult> ListMetrics(const TString& selectors, int pageSize, int page) const override final - { - const auto request = BuildListMetricsRequest(selectors, pageSize, page); + NThreading::TFuture<TGetLabelsResponse> GetLabelNames(const TString& selectors) const override final { + auto resultPromise = NThreading::NewPromise<TGetLabelsResponse>(); + + auto cb = [resultPromise, selectors](NYql::IHTTPGateway::TResult&& result) mutable { + resultPromise.SetValue(ProcessGetLabelsResponse(std::move(result), ParseKnownLabelNames(selectors))); + }; - IHTTPGateway::THeaders headers; - if (auto authInfo = GetAuthInfo()) { - headers.Fields.emplace_back(TStringBuilder{} << "Authorization: " << *authInfo); - } + DoHttpRequest( + std::move(cb), + BuildGetLabelsUrl(selectors) + ); + + return resultPromise.GetFuture(); + } - auto resultPromise = NThreading::NewPromise<TListMetricsResult>(); + NThreading::TFuture<TListMetricsResponse> ListMetrics(const TString& selectors, int pageSize, int page) const override final { + auto resultPromise = NThreading::NewPromise<TListMetricsResponse>(); - std::weak_ptr<const TSolomonAccessorClient> weakSelf = shared_from_this(); - // hold context until reply - auto cb = [weakSelf, resultPromise](NYql::IHTTPGateway::TResult&& result) mutable - { - if (auto self = weakSelf.lock()) { - resultPromise.SetValue(self->ProcessHttpResponse(std::move(result))); - } else { - resultPromise.SetValue(TListMetricsResult("Client has been shut down")); - } + auto cb = [resultPromise](NYql::IHTTPGateway::TResult&& result) mutable { + resultPromise.SetValue(ProcessListMetricsResponse(std::move(result))); + }; + + DoHttpRequest( + std::move(cb), + BuildListMetricsUrl(selectors, pageSize, page) + ); + + return resultPromise.GetFuture(); + } + + NThreading::TFuture<TGetPointsCountResponse> GetPointsCount(const std::vector<TMetric>& metrics) const override final { + auto requestUrl = BuildGetPointsCountUrl(); + auto requestBody = BuildGetPointsCountBody(metrics); + + auto resultPromise = NThreading::NewPromise<TGetPointsCountResponse>(); + + auto cb = [resultPromise, expectedSize = metrics.size()](NYql::IHTTPGateway::TResult&& response) mutable { + resultPromise.SetValue(ProcessGetPointsCountResponse(std::move(response), expectedSize)); }; - HttpGateway->Download( - request, - headers, - 0, - ListSizeLimit, - std::move(cb) + DoHttpRequest( + std::move(cb), + std::move(requestUrl), + std::move(requestBody) ); return resultPromise.GetFuture(); } - NThreading::TFuture<TGetDataResult> GetData(const std::vector<TString>& selectors) const override final - { - const auto request = BuildGetDataRequest(selectors); + NThreading::TFuture<TGetDataResponse> GetData(TMetric metric, TInstant from, TInstant to) const override final { + return GetData(BuildSelectorsProgram(metric), from, to); + } + + NThreading::TFuture<TGetDataResponse> GetData(TString selectors, TInstant from, TInstant to) const override final { + const auto request = BuildGetDataRequest(selectors, from, to); NYdbGrpc::TCallMeta callMeta; if (auto authInfo = GetAuthInfo()) { callMeta.Aux.emplace_back("authorization", *authInfo); } + callMeta.Aux.emplace_back("x-client-id", "yandex-query"); - auto resultPromise = NThreading::NewPromise<TGetDataResult>(); - - const auto connection = GrpcClient->CreateGRpcServiceConnection<DataService>(GrpcConfig); + auto resultPromise = NThreading::NewPromise<TGetDataResponse>(); auto context = GrpcClient->CreateContext(); if (!context) { - resultPromise.SetValue(TGetDataResult("Client is being shutted down")); + resultPromise.SetValue(TGetDataResponse("Client is being shutted down")); return resultPromise.GetFuture(); } - std::weak_ptr<const TSolomonAccessorClient> weakSelf = shared_from_this(); // hold context until reply - auto cb = [weakSelf, resultPromise, context]( - NYdbGrpc::TGrpcStatus&& status, - ReadResponse&& result) mutable - { - if (auto self = weakSelf.lock()) { - resultPromise.SetValue(self->ProcessGrpcResponse(std::move(status), std::move(result))); - } else { - resultPromise.SetValue(TGetDataResult("Client has been shut down")); - } + auto cb = [resultPromise, context](NYdbGrpc::TGrpcStatus&& status, ReadResponse&& result) mutable { + resultPromise.SetValue(ProcessGetDataResponse(std::move(status), std::move(result))); }; - connection->DoRequest<ReadRequest, ReadResponse>( - std::move(request), - std::move(cb), - &DataService::Stub::AsyncRead, - callMeta, - context.get() - ); + GrpcConnection->DoRequest<ReadRequest, ReadResponse>( + std::move(request), + std::move(cb), + &DataService::Stub::AsyncRead, + callMeta, + context.get() + ); return resultPromise.GetFuture(); } private: - TMaybe<TString> GetAuthInfo() const - { + std::optional<TString> GetAuthInfo() const { if (!Settings.GetUseSsl()) { return {}; } @@ -184,18 +365,75 @@ private: } } - TString GetHttpSolomonEndpoint() const - { + TString GetHttpSolomonEndpoint() const { return (Settings.GetUseSsl() ? "https://" : "http://") + Settings.GetEndpoint(); } - TString GetGrpcSolomonEndpoint() const - { + TString GetGrpcSolomonEndpoint() const { return TStringBuilder() << Settings.GetEndpoint() << ":" << DefaultGrpcPort; } - TString BuildListMetricsRequest(const TString& selectors, int pageSize, int page) const - { + template <typename TCallback> + void DoHttpRequest(TCallback&& callback, TString&& url, TString&& body = "") const { + IHTTPGateway::THeaders headers; + if (auto authInfo = GetAuthInfo()) { + headers.Fields.emplace_back(TStringBuilder{} << "Authorization: " << *authInfo); + } + headers.Fields.emplace_back("x-client-id: yandex-query"); + headers.Fields.emplace_back("accept: application/json;charset=UTF-8"); + headers.Fields.emplace_back("Content-Type: application/json;charset=UTF-8"); + + auto retryPolicy = IHTTPGateway::TRetryPolicy::GetExponentialBackoffPolicy( + [](CURLcode, long httpCode) { + if (httpCode == 429 /*RESOURCE_EXHAUSTED*/) { + return ERetryErrorClass::ShortRetry; + } + return ERetryErrorClass::NoRetry; + }, + TDuration::MilliSeconds(5), + TDuration::MilliSeconds(200), + TDuration::MilliSeconds(500) + ); + + if (!body.empty()) { + HttpGateway->Upload( + std::move(url), + std::move(headers), + std::move(body), + std::move(callback), + false, + retryPolicy + ); + } else { + HttpGateway->Download( + std::move(url), + std::move(headers), + 0, + ListSizeLimit, + std::move(callback), + {}, + retryPolicy + ); + } + } + + TString BuildGetLabelsUrl(const TString& selectors) const { + TUrlBuilder builder(GetHttpSolomonEndpoint()); + + builder.AddPathComponent("api"); + builder.AddPathComponent("v2"); + builder.AddPathComponent("projects"); + builder.AddPathComponent(Settings.GetProject()); + builder.AddPathComponent("sensors"); + builder.AddPathComponent("names"); + + builder.AddUrlParam("selectors", selectors); + builder.AddUrlParam("forceCluster", DefaultReplica); + + return builder.Build(); + } + + TString BuildListMetricsUrl(const TString& selectors, int pageSize, int page) const { TUrlBuilder builder(GetHttpSolomonEndpoint()); builder.AddPathComponent("api"); @@ -208,17 +446,67 @@ private: builder.AddUrlParam("forceCluster", DefaultReplica); builder.AddUrlParam("pageSize", std::to_string(pageSize)); builder.AddUrlParam("page", std::to_string(page)); + builder.AddUrlParam("from", TInstant::Seconds(Settings.GetFrom()).ToIsoStringLocalUpToSeconds()); + builder.AddUrlParam("to", TInstant::Seconds(Settings.GetTo()).ToIsoStringLocalUpToSeconds()); + + return builder.Build(); + } + + TString BuildGetPointsCountUrl() const { + TUrlBuilder builder(GetHttpSolomonEndpoint()); + + builder.AddPathComponent("api"); + builder.AddPathComponent("v2"); + builder.AddPathComponent("projects"); + builder.AddPathComponent(Settings.GetProject()); + builder.AddPathComponent("sensors"); + builder.AddPathComponent("data"); return builder.Build(); } - ReadRequest BuildGetDataRequest(const std::vector<TString>& selectors) const - { + TString BuildGetPointsCountBody(const std::vector<TMetric>& metrics) const { + std::vector<TString> selectors; + selectors.reserve(metrics.size()); + std::transform( + metrics.begin(), + metrics.end(), + std::back_inserter(selectors), + [this](const TMetric& metric) -> TString { + return TStringBuilder() << "count(" << BuildSelectorsProgram(metric) << ")"; + } + ); + + TString program = TStringBuilder() << "[" << JoinSeq(",", selectors) << "]"; + + const auto& ds = Settings.GetDownsampling(); + NJsonWriter::TBuf w; + w.BeginObject() + .UnsafeWriteKey("from").WriteString(TInstant::Seconds(Settings.GetFrom()).ToString()) + .UnsafeWriteKey("to").WriteString(TInstant::Seconds(Settings.GetTo()).ToString()) + .UnsafeWriteKey("program").WriteString(program) + .UnsafeWriteKey("downsampling") + .BeginObject() + .UnsafeWriteKey("disabled").WriteBool(ds.GetDisabled()); + + if (!ds.GetDisabled()) { + w.UnsafeWriteKey("aggregation").WriteString(ds.GetAggregation()) + .UnsafeWriteKey("fill").WriteString(ds.GetFill()) + .UnsafeWriteKey("gridMillis").WriteLongLong(ds.GetGridMs()); + } + w.EndObject() + .UnsafeWriteKey("forceCluster").WriteString(DefaultReplica) + .EndObject(); + + return w.Str(); + } + + ReadRequest BuildGetDataRequest(const TString& selectors, TInstant from, TInstant to) const { ReadRequest request; request.mutable_container()->set_project_id(Settings.GetProject()); - *request.mutable_from_time() = NProtoInterop::CastToProto(TInstant::Seconds(Settings.GetFrom())); - *request.mutable_to_time() = NProtoInterop::CastToProto(TInstant::Seconds(Settings.GetTo())); + *request.mutable_from_time() = NProtoInterop::CastToProto(from); + *request.mutable_to_time() = NProtoInterop::CastToProto(to); *request.mutable_force_replica() = DefaultReplica; if (Settings.GetDownsampling().GetDisabled()) { @@ -230,152 +518,54 @@ private: request.mutable_downsampling()->set_gap_filling(ParseGapFilling(downsampling.GetFill())); } - ui64 cnt = 0; - for (const auto& metric : selectors) { - auto query = request.mutable_queries()->Add(); - *query->mutable_value() = metric; - *query->mutable_name() = TStringBuilder() << "query" << cnt++; - query->set_hidden(false); - } + auto query = request.mutable_queries()->Add(); + *query->mutable_value() = selectors; + *query->mutable_name() = "query"; + query->set_hidden(false); return request; } - TListMetricsResult ProcessHttpResponse(NYql::IHTTPGateway::TResult&& response) const - { - std::vector<TMetric> result; - - if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) { - return TListMetricsResult(TStringBuilder{} << "Error while sending list metrics request to monitoring api: " << response.Content.data()); - } - - NJson::TJsonValue json; - try { - NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true); - } catch (const std::exception& e) { - return TListMetricsResult(TStringBuilder{} << "Failed to parse response from monitoring api: " << e.what()); - } - - if (!json.IsMap() || !json.Has("result") || !json.Has("page")) { - return TListMetricsResult(TStringBuilder{} << "Invalid result from monitoring api"); - } - - const auto pagesInfo = json["page"]; - if (!pagesInfo.IsMap() || !pagesInfo.Has("pagesCount") || !pagesInfo["pagesCount"].IsInteger()) { - return TListMetricsResult(TStringBuilder{} << "Invalid paging info from monitoring api"); - } - - size_t pagesCount = pagesInfo["pagesCount"].GetInteger(); - - for (const auto& metricObj : json["result"].GetArray()) { - try { - result.emplace_back(metricObj); - } catch (const std::exception& e) { - return TListMetricsResult(TStringBuilder{} << "Failed to parse result response from monitoring: " << e.what()); + TString BuildSelectorsProgram(const TMetric& metric) const { + std::vector<TString> mappedValues; + mappedValues.reserve(Settings.GetRequiredLabelNames().size()); + std::transform( + Settings.GetRequiredLabelNames().begin(), + Settings.GetRequiredLabelNames().end(), + std::back_inserter(mappedValues), + [&labels = metric.Labels](const TString& labelName) -> TString { + if (labels.find(labelName) == labels.end()) { + return TStringBuilder() << labelName << "=\"-\""; + } + return TStringBuilder() << labelName << "=\"" << labels.at(labelName) << "\""; } - } - - return TListMetricsResult(pagesCount, std::move(result)); - } - - TGetDataResult ProcessGrpcResponse(NYdbGrpc::TGrpcStatus&& status, ReadResponse&& response) const - { - std::vector<TTimeseries> result; - - if (!status.Ok()) { - return TGetDataResult(TStringBuilder{} << "Error while sending data request to monitoring api: " << status.Msg); - } - - for (const auto& responseValue : response.response_per_query()) { - YQL_ENSURE(responseValue.has_timeseries_vector()); - for (const auto& queryResponse : responseValue.timeseries_vector().values()) { - auto type = MetricTypeToString(queryResponse.type()); - - std::map<TString, TString> labels(queryResponse.labels().begin(), queryResponse.labels().end()); - std::vector<int64_t> timestamps(queryResponse.timestamp_values().values().begin(), queryResponse.timestamp_values().values().end()); - std::vector<double> values(queryResponse.double_values().values().begin(), queryResponse.double_values().values().end()); - - result.emplace_back(queryResponse.name(), std::move(labels), type, std::move(timestamps), std::move(values)); - } - - } + ); - return TGetDataResult(std::move(result)); + return TStringBuilder() << "{" << JoinSeq(",", mappedValues) << "}"; } private: const TString DefaultReplica; const ui64 DefaultGrpcPort; - const size_t ListSizeLimit = 1ull << 20; - const NYql::NSo::NProto::TDqSolomonSource& Settings; + const ui64 ListSizeLimit = 1ull << 20; + const ui64 HttpMaxInflight = 200; + const ui64 GrpcMaxInflight = 2000; + const NYql::NSo::NProto::TDqSolomonSource Settings; const std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider; + THttpGatewayConfig HttpConfig; IHTTPGateway::TPtr HttpGateway; NYdbGrpc::TGRpcClientConfig GrpcConfig; + std::unique_ptr<NYdbGrpc::TServiceConnection<DataService>> GrpcConnection; std::shared_ptr<NYdbGrpc::TGRpcClientLow> GrpcClient; }; } // namespace -TMetric::TMetric(const NJson::TJsonValue& value) -{ - YQL_ENSURE(value.IsMap()); - - if (value.Has("labels")) { - auto labels = value["labels"]; - YQL_ENSURE(labels.IsMap()); - - for (const auto& [key, value] : labels.GetMapSafe()) { - YQL_ENSURE(value.IsString()); - Labels[key] = value.GetString(); - } - } - - if (value.Has("type")) { - YQL_ENSURE(value["type"].IsString()); - Type = value["type"].GetString(); - } - - if (value.Has("createdAt")) { - YQL_ENSURE(value["createdAt"].IsString()); - CreatedAt = value["createdAt"].GetString(); - } -} - -ISolomonAccessorClient::TListMetricsResult::TListMetricsResult() - : Success(false) -{} - -ISolomonAccessorClient::TListMetricsResult::TListMetricsResult(const TString& error) - : Success(false) - , ErrorMsg(error) -{} - -ISolomonAccessorClient::TListMetricsResult::TListMetricsResult(size_t pagesCount, std::vector<TMetric>&& result) - : Success(true) - , PagesCount(pagesCount) - , Result(std::move(result)) -{} - -ISolomonAccessorClient::TGetDataResult::TGetDataResult() - : Success(false) -{} - -ISolomonAccessorClient::TGetDataResult::TGetDataResult(const TString& error) - : Success(false) - , ErrorMsg(error) -{} - -ISolomonAccessorClient::TGetDataResult::TGetDataResult(std::vector<TTimeseries>&& result) - : Success(true) - , Result(std::move(result)) -{} - ISolomonAccessorClient::TPtr ISolomonAccessorClient::Make( - const NYql::NSo::NProto::TDqSolomonSource& source, - std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider) -{ + NYql::NSo::NProto::TDqSolomonSource source, + std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider) { const auto& settings = source.settings(); TString defaultReplica = "sas"; @@ -388,7 +578,7 @@ ISolomonAccessorClient::Make( defaultGrpcPort = FromString<ui64>(it->second); } - return std::make_shared<TSolomonAccessorClient>(defaultReplica, defaultGrpcPort, source, credentialsProvider); + return std::make_shared<TSolomonAccessorClient>(defaultReplica, defaultGrpcPort, std::move(source), credentialsProvider); } } // namespace NYql::NSo diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h index a680f29e7ce..87e1728e51c 100644 --- a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h +++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h @@ -1,31 +1,12 @@ #pragma once -#include <library/cpp/json/json_reader.h> #include <library/cpp/threading/future/core/future.h> #include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/credentials/credentials.h> #include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h> +#include <ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.h> namespace NYql::NSo { -class TMetric { -public: - explicit TMetric(const NJson::TJsonValue& value); - -public: - std::map<TString, TString> Labels; - TString Type; - TString CreatedAt; -}; - -class TTimeseries { -public: - TString Name; - std::map<TString, TString> Labels; - TString Type; - std::vector<int64_t> Timestamps; - std::vector<double> Values; -}; - class ISolomonAccessorClient { public: using TPtr = std::shared_ptr<ISolomonAccessorClient>; @@ -34,37 +15,15 @@ public: virtual ~ISolomonAccessorClient() = default; static TPtr Make( - const NYql::NSo::NProto::TDqSolomonSource& source, + NYql::NSo::NProto::TDqSolomonSource source, std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider); - class TListMetricsResult { - public: - explicit TListMetricsResult(); - explicit TListMetricsResult(const TString& errorMsg); - explicit TListMetricsResult(size_t pagesCount, std::vector<TMetric>&& result); - - public: - bool Success; - TString ErrorMsg; - size_t PagesCount; - std::vector<TMetric> Result; - }; - - class TGetDataResult { - public: - explicit TGetDataResult(); - explicit TGetDataResult(const TString& errorMsg); - explicit TGetDataResult(std::vector<TTimeseries>&& result); - - public: - bool Success; - TString ErrorMsg; - std::vector<TTimeseries> Result; - }; - public: - virtual NThreading::TFuture<TListMetricsResult> ListMetrics(const TString& selectors, int pageSize, int page) const = 0; - virtual NThreading::TFuture<TGetDataResult> GetData(const std::vector<TString>& selectors) const = 0; + virtual NThreading::TFuture<TGetLabelsResponse> GetLabelNames(const TString& selectors) const = 0; + virtual NThreading::TFuture<TListMetricsResponse> ListMetrics(const TString& selectors, int pageSize, int page) const = 0; + virtual NThreading::TFuture<TGetPointsCountResponse> GetPointsCount(const std::vector<TMetric>& metrics) const = 0; + virtual NThreading::TFuture<TGetDataResponse> GetData(TMetric metric, TInstant from, TInstant to) const = 0; + virtual NThreading::TFuture<TGetDataResponse> GetData(TString selectors, TInstant from, TInstant to) const = 0; }; } // namespace NYql::NSo diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.cpp b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.cpp new file mode 100644 index 00000000000..5356e508288 --- /dev/null +++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.cpp @@ -0,0 +1,26 @@ +#include "solomon_client_utils.h" + +#include <yql/essentials/utils/yql_panic.h> + +namespace NYql::NSo { + +template <typename T> +TSolomonClientResponse<T>::TSolomonClientResponse() + : Status(STATUS_RETRIABLE_ERROR) {} + +template <typename T> +TSolomonClientResponse<T>::TSolomonClientResponse(const TString& error) + : Status(STATUS_FATAL_ERROR) + , Error(error) {} + +template <typename T> +TSolomonClientResponse<T>::TSolomonClientResponse(T&& result) + : Status(STATUS_OK) + , Result(std::move(result)) {} + +template class TSolomonClientResponse<TGetLabelsResult>; +template class TSolomonClientResponse<TListMetricsResult>; +template class TSolomonClientResponse<TGetPointsCountResult>; +template class TSolomonClientResponse<TGetDataResult>; + +} // namespace NYql::NSo diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.h b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.h new file mode 100644 index 00000000000..6682d4e5989 --- /dev/null +++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.h @@ -0,0 +1,64 @@ +#pragma once + +#include <library/cpp/json/json_reader.h> + +#include <map> +#include <vector> + +namespace NYql::NSo { + +enum EStatus { + STATUS_OK, + STATUS_RETRIABLE_ERROR, + STATUS_FATAL_ERROR +}; + +template <typename T> +class TSolomonClientResponse { +public: + TSolomonClientResponse(); + explicit TSolomonClientResponse(const TString& error); + explicit TSolomonClientResponse(T&& result); + + TSolomonClientResponse(const TSolomonClientResponse&) = default; + TSolomonClientResponse(TSolomonClientResponse&&) = default; + +public: + EStatus Status; + TString Error; + T Result; +}; + +struct TMetric { + std::map<TString, TString> Labels; + TString Type; +}; + +struct TTimeseries { + TMetric Metric; + std::vector<int64_t> Timestamps; + std::vector<double> Values; +}; + +struct TGetLabelsResult { + std::vector<TString> Labels; +}; +using TGetLabelsResponse = TSolomonClientResponse<TGetLabelsResult>; + +struct TListMetricsResult { + std::vector<TMetric> Metrics; + ui64 PagesCount; +}; +using TListMetricsResponse = TSolomonClientResponse<TListMetricsResult>; + +struct TGetPointsCountResult { + std::vector<ui64> PointsCount; +}; +using TGetPointsCountResponse = TSolomonClientResponse<TGetPointsCountResult>; + +struct TGetDataResult { + std::vector<TTimeseries> Timeseries; +}; +using TGetDataResponse = TSolomonClientResponse<TGetDataResult>; + +} // namespace NYql::NSo diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/ya.make b/ydb/library/yql/providers/solomon/solomon_accessor/client/ya.make index 0789f4ab7b5..9ecc766d0a1 100644 --- a/ydb/library/yql/providers/solomon/solomon_accessor/client/ya.make +++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( solomon_accessor_client.cpp + solomon_client_utils.cpp ) PEERDIR( diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/grpc/data_service.proto b/ydb/library/yql/providers/solomon/solomon_accessor/grpc/data_service.proto new file mode 100644 index 00000000000..947b1675ece --- /dev/null +++ b/ydb/library/yql/providers/solomon/solomon_accessor/grpc/data_service.proto @@ -0,0 +1,192 @@ +syntax = "proto3"; + +package yandex.monitoring.api.v3; + +import "google/protobuf/timestamp.proto"; +import "google/rpc/status.proto"; + +option go_package = "github.com/ydb-platform/ydb/ydb/library/yql/providers/solomon/solomon_accessor/grpc;solomon_accessor_pb"; + +// A set of methods for reading metrics data. +service DataService { + // Retrieves metric data based on multi-query input. + rpc Read(ReadRequest) returns (ReadResponse) { + } +} + +message Container { + // Default project/folder for this request. + oneof container { + // ID of the project that will be used as default. + // (unless it's specified in query) + string project_id = 1; + + // Cloud folder that will be used as default. + // (unless it's specified in query) + string folder_id = 2; + } +} + +// List of available aggregate functions for downsampling. +message Downsampling { + // List of available aggregate functions for downsampling. + enum GridAggregation { + GRID_AGGREGATION_UNSPECIFIED = 0; + + // Max value. + GRID_AGGREGATION_MAX = 1; + + // Min value. + GRID_AGGREGATION_MIN = 2; + + // Sum of values. + GRID_AGGREGATION_SUM = 3; + + // Average value. + GRID_AGGREGATION_AVG = 4; + + // Last value. + GRID_AGGREGATION_LAST = 5; + + // Total count of points. + GRID_AGGREGATION_COUNT = 6; + } + + // List of available gap filling policy for downsampling. + enum GapFilling { + GAP_FILLING_UNSPECIFIED = 0; + + // Returns `null` as a metric value and `timestamp` as a time series value. + GAP_FILLING_NULL = 1; + + // Returns no value and no timestamp. + GAP_FILLING_NONE = 2; + + // Returns the value from the previous time interval. + GAP_FILLING_PREVIOUS = 3; + } + + oneof mode { + // Maximum number of points to be returned. + int64 max_points = 1; + + // Time interval (grid) for downsampling in milliseconds. + // Points in the specified range are aggregated into one time point. + int64 grid_interval = 2; + + // Disable downsampling. + bool disabled = 3; + } + + // Function that is used for downsampling. + GridAggregation grid_aggregation = 4; + + // Parameters for filling gaps in data. + GapFilling gap_filling = 5; +} + +// Query used in multi-source requests for metrics retrieving. +message Query { + // Name of the query. + string name = 1; + + // Text of the program that will be executed. + string value = 2; + + // Flag for hiding result timeseries from response. + // Note: logic will still be executed, + // so result of the query processing can be used in following calculations + bool hidden = 3; +} + +message ReadRequest { + // Default container (project/folder) for this request. + Container container = 1; + + // Time from which data should be collected. + google.protobuf.Timestamp from_time = 3; + + // Time until which data should be collected. + google.protobuf.Timestamp to_time = 4; + + // Queries which are data gathered based on. + repeated Query queries = 5; + + // Downsampling parameters. + Downsampling downsampling = 6; + + // Force cluster. + string force_replica = 7; +} + +message ReadResponse { + // Data response for each not hidden query. + repeated ResponsePerQuery response_per_query = 1; + + // Error raised during request processing. + repeated google.rpc.Status errors = 2; +} + +message ResponsePerQuery { + message TimeseriesVector { + // Timeseries values. + repeated TimeseriesValue values = 1; + } + + // Name of the query. + string query_name = 1; + + // Result values for the query. + oneof result_values { + // Timeseries vector. + TimeseriesVector timeseries_vector = 2; + } +} + +enum MetricType { + METRIC_TYPE_UNSPECIFIED = 0; + + // Gauge with fractional values. + DGAUGE = 1; + + // Gauge with integer values. + IGAUGE = 2; + + // Counter. + COUNTER = 3; + + // Rate. + RATE = 4; +} + +message TimeseriesValue { + // Alias. + string alias = 1; + + // Name of the metric. + string name = 2; + + // List of metric labels as `key:value` pairs. + map<string, string> labels = 3; + + // Type of the metric. + MetricType type = 4; + + // List of timestamps. + Int64Vector timestamp_values = 5; + + oneof values { + // List of double values. + DoubleVector double_values = 6; + } +} + +message DoubleVector { + // List of double values. + repeated double values = 1; +} + +message Int64Vector { + // List of int64 values. + repeated int64 values = 1; +} diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/grpc/ya.make b/ydb/library/yql/providers/solomon/solomon_accessor/grpc/ya.make index cc46d279fd5..7ee874031a5 100644 --- a/ydb/library/yql/providers/solomon/solomon_accessor/grpc/ya.make +++ b/ydb/library/yql/providers/solomon/solomon_accessor/grpc/ya.make @@ -9,7 +9,7 @@ ENDIF() GRPC() SRCS( - solomon_accessor_pb.proto + data_service.proto ) USE_COMMON_GOOGLE_APIS( diff --git a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Basic-default.txt_/opt.yql b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Basic-default.txt_/opt.yql index e699fa52b7b..96756a09c5b 100644 --- a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Basic-default.txt_/opt.yql +++ b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Basic-default.txt_/opt.yql @@ -4,7 +4,7 @@ (let $3 (DataType 'String)) (let $4 (StructType '('"labels" (DictType $3 $3)) '('"ts" (DataType 'Datetime)) '('type $3) '('"value" (DataType 'Double)))) (let $5 '('"labels" '"value" '"ts" 'type)) -(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15"))) +(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15") '())) (let $7 (DqStage '((DqSource (DataSource '"solomon" '"local_solomon") $6)) (lambda '($10) $10) '('('"_logical_id" '0)))) (let $8 (DqStage '((DqCnUnionAll (TDqOutput $7 '"0"))) (lambda '($11) $11) '('('"_logical_id" '0)))) (let $9 (ResPull! $1 $2 (Key) (DqCnResult (TDqOutput $8 '"0") '()) '('('type) '('autoref)) '"dq")) diff --git a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Downsampling-default.txt_/opt.yql b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Downsampling-default.txt_/opt.yql index ffdb5aec084..4a755e7e36f 100644 --- a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Downsampling-default.txt_/opt.yql +++ b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Downsampling-default.txt_/opt.yql @@ -4,7 +4,7 @@ (let $3 (DataType 'String)) (let $4 (StructType '('"labels" (DictType $3 $3)) '('"ts" (DataType 'Datetime)) '('type $3) '('"value" (DataType 'Double)))) (let $5 '('"labels" '"value" '"ts" 'type)) -(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"SUM" '"PREVIOUS" (Uint32 '"25"))) +(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"SUM" '"PREVIOUS" (Uint32 '"25") '())) (let $7 (DqStage '((DqSource (DataSource '"solomon" '"local_solomon") $6)) (lambda '($10) $10) '('('"_logical_id" '0)))) (let $8 (DqStage '((DqCnUnionAll (TDqOutput $7 '"0"))) (lambda '($11) $11) '('('"_logical_id" '0)))) (let $9 (ResPull! $1 $2 (Key) (DqCnResult (TDqOutput $8 '"0") '()) '('('type) '('autoref)) '"dq")) diff --git a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-DownsamplingValidSettings-default.txt_/opt.yql b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-DownsamplingValidSettings-default.txt_/opt.yql index 83000c22cb2..1a969a8de67 100644 --- a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-DownsamplingValidSettings-default.txt_/opt.yql +++ b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-DownsamplingValidSettings-default.txt_/opt.yql @@ -4,7 +4,7 @@ (let $3 (DataType 'String)) (let $4 (StructType '('"labels" (DictType $3 $3)) '('"ts" (DataType 'Datetime)) '('type $3) '('"value" (DataType 'Double)))) (let $5 '('"labels" '"value" '"ts" 'type)) -(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"SUM" '"PREVIOUS" (Uint32 '"15"))) +(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"SUM" '"PREVIOUS" (Uint32 '"15") '())) (let $7 (DqStage '((DqSource (DataSource '"solomon" '"local_solomon") $6)) (lambda '($10) $10) '('('"_logical_id" '0)))) (let $8 (DqStage '((DqCnUnionAll (TDqOutput $7 '"0"))) (lambda '($11) $11) '('('"_logical_id" '0)))) (let $9 (ResPull! $1 $2 (Key) (DqCnResult (TDqOutput $8 '"0") '()) '('('type) '('autoref)) '"dq")) diff --git a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-HistResponse-default.txt_/opt.yql b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-HistResponse-default.txt_/opt.yql index 972840ac7b1..6c61ea29c43 100644 --- a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-HistResponse-default.txt_/opt.yql +++ b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-HistResponse-default.txt_/opt.yql @@ -4,7 +4,7 @@ (let $3 (DataType 'String)) (let $4 (StructType '('"labels" (DictType $3 $3)) '('"ts" (DataType 'Datetime)) '('type $3) '('"value" (DataType 'Double)))) (let $5 '('"labels" '"value" '"ts" 'type)) -(let $6 (SoSourceSettings world '"hist" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"histogram_percentile(95, {})" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15"))) +(let $6 (SoSourceSettings world '"hist" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"histogram_percentile(95, {})" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15") '())) (let $7 (DqStage '((DqSource (DataSource '"solomon" '"local_solomon") $6)) (lambda '($10) $10) '('('"_logical_id" '0)))) (let $8 (DqStage '((DqCnUnionAll (TDqOutput $7 '"0"))) (lambda '($11) $11) '('('"_logical_id" '0)))) (let $9 (ResPull! $1 $2 (Key) (DqCnResult (TDqOutput $8 '"0") '()) '('('type) '('autoref)) '"dq")) diff --git a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-LabelColumns-default.txt_/opt.yql b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-LabelColumns-default.txt_/opt.yql index 9eb080067b6..ba0fe486fab 100644 --- a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-LabelColumns-default.txt_/opt.yql +++ b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-LabelColumns-default.txt_/opt.yql @@ -8,7 +8,7 @@ (let $7 (StructType '($3 $4) '($5 $4) '($6 $4) '('"project" $4) '('"ts" (DataType 'Datetime)) '('type $4) '('"value" (DataType 'Double)))) (let $8 '('"value" '"ts" 'type)) (let $9 '($3 $5 '"project" $6)) -(let $10 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $7 $8 $9 '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15"))) +(let $10 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $7 $8 $9 '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15") '())) (let $11 (DqStage '((DqSource (DataSource '"solomon" '"local_solomon") $10)) (lambda '($14) $14) '('('"_logical_id" '0)))) (let $12 (DqStage '((DqCnUnionAll (TDqOutput $11 '"0"))) (lambda '($15) $15) '('('"_logical_id" '0)))) (let $13 (ResPull! $1 $2 (Key) (DqCnResult (TDqOutput $12 '"0") '()) '('('type) '('autoref)) '"dq")) diff --git a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Subquery-default.txt_/opt.yql b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Subquery-default.txt_/opt.yql index 5b038c22f4e..508f856cab9 100644 --- a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Subquery-default.txt_/opt.yql +++ b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Subquery-default.txt_/opt.yql @@ -4,7 +4,7 @@ (let $3 (DataType 'String)) (let $4 (StructType '('"labels" (DictType $3 $3)) '('"ts" (DataType 'Datetime)) '('type $3) '('"value" (DataType 'Double)))) (let $5 '('"labels" '"value" '"ts" 'type)) -(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"SUM" '"PREVIOUS" (Uint32 '"25"))) +(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"SUM" '"PREVIOUS" (Uint32 '"25") '())) (let $7 (DqStage '((DqSource (DataSource '"solomon" '"local_solomon") $6)) (lambda '($11) $11) '('('"_logical_id" '0)))) (let $8 (DqStage '((DqCnUnionAll (TDqOutput $7 '"0"))) (lambda '($12) $12) '('('"_logical_id" '0)))) (let $9 '('('type) '('autoref) '('unordered))) diff --git a/ydb/library/yql/tools/solomon_emulator_grpc/__main__.py b/ydb/library/yql/tools/solomon_emulator_grpc/__main__.py index 0669b8cd424..9ba01f1f6f0 100644 --- a/ydb/library/yql/tools/solomon_emulator_grpc/__main__.py +++ b/ydb/library/yql/tools/solomon_emulator_grpc/__main__.py @@ -7,9 +7,9 @@ from library.python.testing.recipe import declare_recipe, set_env from library.recipes.common import find_free_ports import grpc -from ydb.library.yql.providers.solomon.solomon_accessor.grpc.solomon_accessor_pb_pb2_grpc import \ +from ydb.library.yql.providers.solomon.solomon_accessor.grpc.data_service_pb2_grpc import \ DataServiceServicer, add_DataServiceServicer_to_server -from ydb.library.yql.providers.solomon.solomon_accessor.grpc.solomon_accessor_pb_pb2 import ReadRequest, ReadResponse, MetricType +from ydb.library.yql.providers.solomon.solomon_accessor.grpc.data_service_pb2 import ReadRequest, ReadResponse, MetricType PID_FILENAME = "solomon_recipe.pid" |