aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIvan Sukhov <evanevannnn@ydb.tech>2025-03-25 18:48:02 +0300
committerGitHub <noreply@github.com>2025-03-25 18:48:02 +0300
commite88b590555a32f7c43cc0a78612059b769fca2af (patch)
tree1a0648781afacd0342b0d657b8a3a71bcf413764
parent320040026605888ad720d8065822a442a27d221f (diff)
downloadydb-e88b590555a32f7c43cc0a78612059b769fca2af.tar.gz
Solomon read actor refactoring (#15662)
-rw-r--r--ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp47
-rw-r--r--ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.cpp304
-rw-r--r--ydb/library/yql/providers/solomon/events/events.h48
-rw-r--r--ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json8
-rw-r--r--ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto1
-rw-r--r--ydb/library/yql/providers/solomon/proto/metrics_queue.proto18
-rw-r--r--ydb/library/yql/providers/solomon/provider/ya.make1
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp1
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_config.h1
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp7
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp106
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp2
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp127
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_provider_impl.h1
-rw-r--r--ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.cpp594
-rw-r--r--ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h55
-rw-r--r--ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.cpp26
-rw-r--r--ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.h64
-rw-r--r--ydb/library/yql/providers/solomon/solomon_accessor/client/ya.make1
-rw-r--r--ydb/library/yql/providers/solomon/solomon_accessor/grpc/data_service.proto192
-rw-r--r--ydb/library/yql/providers/solomon/solomon_accessor/grpc/ya.make2
-rw-r--r--ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Basic-default.txt_/opt.yql2
-rw-r--r--ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Downsampling-default.txt_/opt.yql2
-rw-r--r--ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-DownsamplingValidSettings-default.txt_/opt.yql2
-rw-r--r--ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-HistResponse-default.txt_/opt.yql2
-rw-r--r--ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-LabelColumns-default.txt_/opt.yql2
-rw-r--r--ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Subquery-default.txt_/opt.yql2
-rw-r--r--ydb/library/yql/tools/solomon_emulator_grpc/__main__.py4
28 files changed, 1180 insertions, 442 deletions
diff --git a/ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp b/ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp
index 75c7ccb1671..b3570cf19d4 100644
--- a/ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp
+++ b/ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.cpp
@@ -47,9 +47,9 @@ public:
"expected EvEnd <= EventSpaceEnd(TEvents::ES_PRIVATE)");
struct TEvNextListingChunkReceived : public NActors::TEventLocal<TEvNextListingChunkReceived, EvNextListingChunkReceived> {
- NSo::ISolomonAccessorClient::TListMetricsResult ListingResult;
- TEvNextListingChunkReceived(NSo::ISolomonAccessorClient::TListMetricsResult listingResult)
- : ListingResult(std::move(listingResult)) {};
+ NSo::TListMetricsResponse Response;
+ explicit TEvNextListingChunkReceived(NSo::TListMetricsResponse&& response)
+ : Response(std::move(response)) {}
};
struct TEvRoundRobinStageTimeout : public NActors::TEventLocal<TEvRoundRobinStageTimeout, EvRoundRobinStageTimeout> {
@@ -90,6 +90,7 @@ public:
STATEFN(ThereAreMetricsToListState) {
try {
switch (const auto etype = ev->GetTypeRewrite()) {
+ hFunc(TEvSolomonProvider::TEvUpdateConsumersCount, HandleUpdateConsumersCount);
hFunc(TEvSolomonProvider::TEvGetNextBatch, HandleGetNextBatch);
hFunc(TEvPrivatePrivate::TEvNextListingChunkReceived, HandleNextListingChunkReceived);
cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout);
@@ -109,6 +110,7 @@ public:
STATEFN(NoMoreMetricsState) {
try {
switch (const auto etype = ev->GetTypeRewrite()) {
+ hFunc(TEvSolomonProvider::TEvUpdateConsumersCount, HandleUpdateConsumersCount);
hFunc(TEvSolomonProvider::TEvGetNextBatch, HandleGetNextBatchForEmptyState);
cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout);
cFunc(NActors::TEvents::TSystem::Poison, HandlePoison);
@@ -126,6 +128,7 @@ public:
STATEFN(AnErrorOccurredState) {
try {
switch (const auto etype = ev->GetTypeRewrite()) {
+ hFunc(TEvSolomonProvider::TEvUpdateConsumersCount, HandleUpdateConsumersCount);
hFunc(TEvSolomonProvider::TEvGetNextBatch, HandleGetNextBatchForErrorState);
cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout);
cFunc(NActors::TEvents::TSystem::Poison, HandlePoison);
@@ -141,6 +144,15 @@ public:
}
private:
+ void HandleUpdateConsumersCount(TEvSolomonProvider::TEvUpdateConsumersCount::TPtr& ev) {
+ if (const auto [it, inserted] = UpdatedConsumers.emplace(ev->Sender); inserted) {
+ LOG_D("TDqSolomonMetricsQueueActor",
+ "HandleUpdateConsumersCount Reducing ConsumersCount by " << ev->Get()->Record.GetConsumersCountDelta() << ", received from " << ev->Sender);
+ ConsumersCount -= ev->Get()->Record.GetConsumersCountDelta();
+ }
+ Send(ev->Sender, new TEvSolomonProvider::TEvAck(ev->Get()->Record.GetTransportMeta()));
+ }
+
void HandleGetNextBatch(TEvSolomonProvider::TEvGetNextBatch::TPtr& ev) {
if (HasEnoughToSend()) {
LOG_I("TDqSolomonMetricsQueueActor", "HandleGetNextBatch has enough metrics to send, trying to send them");
@@ -157,7 +169,7 @@ private:
YQL_ENSURE(ListingFuture.Defined());
ListingFuture = Nothing();
LOG_D("TDqSolomonMetricsQueueActor", "HandleNextListingChunkReceived");
- if (SaveRetrievedResults(ev->Get()->ListingResult)) {
+ if (SaveRetrievedResults(ev->Get()->Response)) {
AnswerPendingRequests(true);
if (!HasPendingRequests) {
LOG_D("TDqSolomonMetricsQueueActor", "HandleNextListingChunkReceived no pending requests. Trying to prefetch");
@@ -213,22 +225,22 @@ private:
Become(&TDqSolomonMetricsQueueActor::AnErrorOccurredState);
}
- bool SaveRetrievedResults(const NSo::ISolomonAccessorClient::TListMetricsResult& listingResult) {
+ bool SaveRetrievedResults(const NSo::TListMetricsResponse& response) {
LOG_T("TDqSolomonMetricsQueueActor", "SaveRetrievedResults");
- if (!listingResult.Success) {
- MaybeIssues = listingResult.ErrorMsg;
+ if (response.Status != NSo::EStatus::STATUS_OK) {
+ MaybeIssues = response.Error;
return false;
}
- if (CurrentPage >= listingResult.PagesCount) {
+ if (CurrentPage >= response.Result.PagesCount) {
LOG_I("TDqSolomonMetricsQueueActor", "SaveRetrievedResults no more metrics to list");
HasMoreMetrics = false;
Become(&TDqSolomonMetricsQueueActor::NoMoreMetricsState);
}
- LOG_D("TDqSolomonMetricsQueueActor", "SaveRetrievedResults saving: " << listingResult.Result.size() << " metrics");
- for (const auto& metric : listingResult.Result) {
- NSo::MetricQueue::TMetricLabels protoMetric;
+ LOG_D("TDqSolomonMetricsQueueActor", "SaveRetrievedResults saving: " << response.Result.Metrics.size() << " metrics");
+ for (const auto& metric : response.Result.Metrics) {
+ NSo::MetricQueue::TMetric protoMetric;
protoMetric.SetType(metric.Type);
protoMetric.MutableLabels()->insert(metric.Labels.begin(), metric.Labels.end());
Metrics.emplace_back(std::move(protoMetric));
@@ -266,12 +278,11 @@ private:
SolomonClient
->ListMetrics(ReadParams.Source.GetSelectors(), PageSize, CurrentPage++)
.Subscribe([actorSystem, selfId = SelfId()](
- const NThreading::TFuture<NSo::ISolomonAccessorClient::TListMetricsResult>& future) -> void {
+ NThreading::TFuture<NSo::TListMetricsResponse> future) -> void {
try {
actorSystem->Send(
selfId,
- new TEvPrivatePrivate::TEvNextListingChunkReceived(
- std::move(future.GetValue())));
+ new TEvPrivatePrivate::TEvNextListingChunkReceived(future.ExtractValue()));
} catch (const std::exception& e) {
actorSystem->Send(
selfId,
@@ -347,7 +358,8 @@ private:
void SendMetrics(const NActors::TActorId& consumer, const NDqProto::TMessageTransportMeta& transportMeta) {
YQL_ENSURE(!MaybeIssues.Defined());
- std::vector<NSo::MetricQueue::TMetricLabels> result;
+ std::vector<NSo::MetricQueue::TMetric> result;
+ result.reserve(std::min<ui64>(BatchCountLimit, Metrics.size()));
while (!Metrics.empty() && result.size() < BatchCountLimit) {
result.push_back(Metrics.back());
Metrics.pop_back();
@@ -396,13 +408,14 @@ private:
bool IsRoundRobinFinishScheduled = false;
bool RoundRobinStageFinished = false;
THashSet<NActors::TActorId> StartedConsumers;
+ THashSet<NActors::TActorId> UpdatedConsumers;
THashSet<NActors::TActorId> FinishedConsumers;
THashMap<NActors::TActorId, ui64> FinishingConsumerToLastSeqNo;
- TMaybe<NThreading::TFuture<NSo::ISolomonAccessorClient::TListMetricsResult>> ListingFuture;
+ TMaybe<NThreading::TFuture<NSo::TListMetricsResponse>> ListingFuture;
bool HasPendingRequests;
THashMap<NActors::TActorId, TDeque<NDqProto::TMessageTransportMeta>> PendingRequests;
- std::vector<NSo::MetricQueue::TMetricLabels> Metrics;
+ std::vector<NSo::MetricQueue::TMetric> Metrics;
TMaybe<TString> MaybeIssues;
ui64 PageSize;
diff --git a/ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.cpp b/ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.cpp
index a607eb88e17..953d8bdeb22 100644
--- a/ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.cpp
+++ b/ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.cpp
@@ -3,6 +3,7 @@
#include <library/cpp/protobuf/util/pb_io.h>
+#include <util/string/join.h>
#include <ydb/library/yql/dq/actors/common/retry_queue.h>
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.h>
#include <ydb/library/yql/providers/solomon/events/events.h>
@@ -65,29 +66,14 @@ using namespace NKikimr::NMiniKQL;
namespace {
-enum ESystemColumn{
- SC_KIND = 0,
- SC_LABELS,
- SC_VALUE,
- SC_TYPE,
- SC_TS
-};
-
-auto RetryPolicy = NYql::NDq::THttpSenderRetryPolicy::GetExponentialBackoffPolicy(
- [](const NHttp::TEvHttpProxy::TEvHttpIncomingResponse* resp){
- if (!resp || !resp->Response) {
- // Connection wasn't established. Should retry.
- return ERetryErrorClass::ShortRetry;
- }
-
- if (resp->Response->Status == "401") {
- return ERetryErrorClass::NoRetry;
- }
-
- return ERetryErrorClass::ShortRetry;
- });
-
class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadActor>, public IDqComputeActorAsyncInput {
+private:
+ struct TMetricTimeRange {
+ NSo::TMetric Metric;
+ TInstant From;
+ TInstant To;
+ };
+
public:
static constexpr char ActorName[] = "DQ_SOLOMON_READ_ACTOR";
@@ -99,8 +85,8 @@ public:
const THolderFactory& holderFactory,
NKikimr::NMiniKQL::TProgramBuilder& programBuilder,
TDqSolomonReadParams&& readParams,
- ui64 maxInflightDataRequests,
ui64 computeActorBatchSize,
+ ui64 metricsQueueConsumersCountDelta,
NActors::TActorId metricsQueueActor,
const ::NMonitoring::TDynamicCounterPtr& counters,
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider
@@ -112,12 +98,13 @@ public:
, ProgramBuilder(programBuilder)
, LogPrefix(TStringBuilder() << "TxId: " << TxId << ", TDqSolomonReadActor: ")
, ReadParams(std::move(readParams))
- , MaxInflightDataRequests(maxInflightDataRequests)
, ComputeActorBatchSize(computeActorBatchSize)
+ , MetricsQueueConsumersCountDelta(metricsQueueConsumersCountDelta)
, MetricsQueueActor(metricsQueueActor)
, CredentialsProvider(credentialsProvider)
, SolomonClient(NSo::ISolomonAccessorClient::Make(ReadParams.Source, CredentialsProvider))
{
+ assert(MaxPointsPerOneMetric != 0);
Y_UNUSED(counters);
SOURCE_LOG_D("Init");
IngressStats.Level = statsLevel;
@@ -141,26 +128,38 @@ public:
}
void Bootstrap() {
- Become(&TDqSolomonReadActor::StateFunc);
if (UseMetricsQueue) {
+ Become(&TDqSolomonReadActor::LimitlessModeState);
MetricsQueueEvents.Init(TxId, SelfId(), SelfId());
MetricsQueueEvents.OnNewRecipientId(MetricsQueueActor);
+
+ if (MetricsQueueConsumersCountDelta > 0) {
+ MetricsQueueEvents.Send(new TEvSolomonProvider::TEvUpdateConsumersCount(MetricsQueueConsumersCountDelta));
+ }
+
RequestMetrics();
} else {
+ Become(&TDqSolomonReadActor::LimitedModeState);
RequestData();
}
}
- STRICT_STFUNC(StateFunc,
+ STRICT_STFUNC(LimitlessModeState,
hFunc(TEvSolomonProvider::TEvMetricsBatch, HandleMetricsBatch);
hFunc(TEvSolomonProvider::TEvMetricsReadError, HandleMetricsReadError);
+ hFunc(TEvSolomonProvider::TEvPointsCountBatch, HandlePointsCountBatch);
hFunc(TEvSolomonProvider::TEvNewDataBatch, HandleNewDataBatch);
+ hFunc(TEvSolomonProvider::TEvAck, Handle);
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle);
hFunc(NActors::TEvents::TEvUndelivered, Handle);
)
+ STRICT_STFUNC(LimitedModeState,
+ hFunc(TEvSolomonProvider::TEvNewDataBatch, HandleNewDataBatchLimited);
+ )
+
void HandleMetricsBatch(TEvSolomonProvider::TEvMetricsBatch::TPtr& metricsBatch) {
if (!MetricsQueueEvents.OnEventReceived(metricsBatch)) {
return;
@@ -176,13 +175,16 @@ public:
IsConfirmedMetricsQueueFinish = true;
}
- auto& metrics = batch.GetMetrics();
+ auto& listedMetrics = batch.GetMetrics();
- SOURCE_LOG_D("HandleMetricsBatch batch of size " << metrics.size());
- Metrics.insert(Metrics.end(), metrics.begin(), metrics.end());
- ListedMetrics += metrics.size();
+ SOURCE_LOG_D("HandleMetricsBatch batch of size " << listedMetrics.size());
+ for (const auto& metric : listedMetrics) {
+ std::map<TString, TString> labels(metric.GetLabels().begin(), metric.GetLabels().end());
+ ListedMetrics.emplace_back(std::move(labels), metric.GetType());
+ }
+ ListedMetricsCount += listedMetrics.size();
- while (TryRequestData()) {}
+ while (TryRequestPointsCount()) {}
if (LastMetricProcessed()) {
NotifyComputeActorWithData();
@@ -202,32 +204,58 @@ public:
}
TIssues issues { TIssue(metricsReadError->Get()->Record.GetIssues()) };
- SOURCE_LOG_W("Got " << "error response[" << metricsReadError->Cookie << "] from solomon: " << issues.ToOneLineString());
+ SOURCE_LOG_W("Got " << "error list metrics response[" << metricsReadError->Cookie << "] from solomon: " << issues.ToOneLineString());
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
return;
}
+ void HandlePointsCountBatch(TEvSolomonProvider::TEvPointsCountBatch::TPtr& pointsCountBatch) {
+ auto& batch = *pointsCountBatch->Get();
+
+ if (batch.Response.Status != NSo::EStatus::STATUS_OK) {
+ TIssues issues { TIssue(batch.Response.Error) };
+ SOURCE_LOG_W("Got " << "error points count response[" << pointsCountBatch->Cookie << "] from solomon: " << issues.ToOneLineString());
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
+ return;
+ }
+
+ auto& metrics = batch.Metrics;
+ auto& pointsCount = batch.Response.Result.PointsCount;
+ ParsePointsCount(metrics, pointsCount);
+
+ SOURCE_LOG_D("HandlePointsCountBatch batch of size " << metrics.size());
+ TryRequestData();
+ }
+
void HandleNewDataBatch(TEvSolomonProvider::TEvNewDataBatch::TPtr& newDataBatch) {
auto& batch = *newDataBatch->Get();
- InflightDataRequests--;
- if (!batch.Result.Success) {
- TIssues issues { TIssue(batch.Result.ErrorMsg) };
- SOURCE_LOG_W("Got " << "error response[" << newDataBatch->Cookie << "] from solomon: " << issues.ToOneLineString());
+ if (batch.Response.Status == NSo::EStatus::STATUS_FATAL_ERROR) {
+ TIssues issues { TIssue(batch.Response.Error) };
+ SOURCE_LOG_W("Got " << "error data response[" << newDataBatch->Cookie << "] from solomon: " << issues.ToOneLineString());
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
return;
}
+ if (batch.Response.Status == NSo::EStatus::STATUS_RETRIABLE_ERROR) {
+ MetricsWithTimeRange.emplace_back(batch.Metric, batch.From, batch.To);
+ TryRequestData();
+ return;
+ }
- MetricsData.insert(MetricsData.end(), batch.Result.Result.begin(), batch.Result.Result.end());
- CompletedMetrics += batch.SelectorsCount;
+ MetricsData.insert(MetricsData.end(), batch.Response.Result.Timeseries.begin(), batch.Response.Result.Timeseries.end());
+ CompletedMetricsCount++;
- if (!Metrics.empty()) {
- while (TryRequestData()) {}
- } else if (MetricsData.size() >= ComputeActorBatchSize || LastMetricReceived()) {
+ if (!MetricsWithTimeRange.empty()) {
+ TryRequestData();
+ } else if (MetricsData.size() >= ComputeActorBatchSize || LastMetricProcessed()) {
NotifyComputeActorWithData();
}
}
+ void Handle(TEvSolomonProvider::TEvAck::TPtr& ev) {
+ MetricsQueueEvents.OnEventReceived(ev);
+ }
+
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) {
SOURCE_LOG_D("Handle MetricsQueue retry");
MetricsQueueEvents.Retry();
@@ -251,13 +279,28 @@ public:
}
}
- i64 GetAsyncInputData(TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
- Y_UNUSED(freeSpace);
+ void HandleNewDataBatchLimited(TEvSolomonProvider::TEvNewDataBatch::TPtr& newDataBatch) {
+ auto& batch = *newDataBatch->Get();
+
+ if (batch.Response.Status != NSo::EStatus::STATUS_OK) {
+ TIssues issues { TIssue(batch.Response.Error) };
+ SOURCE_LOG_W("Got " << "error data response[" << newDataBatch->Cookie << "] from solomon: " << issues.ToOneLineString());
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
+ return;
+ }
+
+ MetricsData.insert(MetricsData.end(), batch.Response.Result.Timeseries.begin(), batch.Response.Result.Timeseries.end());
+ CompletedMetricsCount++;
+
+ NotifyComputeActorWithData();
+ }
+
+ i64 GetAsyncInputData(TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64) final {
YQL_ENSURE(!buffer.IsWide(), "Wide stream is not supported");
- SOURCE_LOG_D("GetAsyncInputData sending " << MetricsData.size() << " metrics");
+ SOURCE_LOG_D("GetAsyncInputData sending " << MetricsData.size() << " metrics, finished = " << LastMetricProcessed());
for (const auto& data : MetricsData) {
- auto& labels = data.Labels;
+ auto& labels = data.Metric.Labels;
auto dictValueBuilder = HolderFactory.NewDict(DictType, 0);
for (auto& [key, value] : labels) {
@@ -267,16 +310,12 @@ public:
auto& timestamps = data.Timestamps;
auto& values = data.Values;
- auto& type = data.Type;
+ auto& type = data.Metric.Type;
for (size_t i = 0; i < timestamps.size(); ++i){
NUdf::TUnboxedValue* items = nullptr;
auto value = HolderFactory.CreateDirectArrayHolder(ReadParams.Source.GetSystemColumns().size() + ReadParams.Source.GetLabelNames().size(), items);
- if (auto it = Index.find(SOLOMON_SCHEME_LABELS); it != Index.end()) {
- items[it->second] = dictValue;
- }
-
if (auto it = Index.find(SOLOMON_SCHEME_VALUE); it != Index.end()) {
items[it->second] = NUdf::TUnboxedValuePod(values[i]);
}
@@ -290,6 +329,10 @@ public:
items[it->second] = NUdf::TUnboxedValuePod((ui64)timestamps[i] / 1000);
}
+ if (auto it = Index.find(SOLOMON_SCHEME_LABELS); it != Index.end()) {
+ items[it->second] = dictValue;
+ }
+
for (const auto& c : ReadParams.Source.GetLabelNames()) {
auto& v = items[Index[c]];
auto it = labels.find(c);
@@ -325,7 +368,10 @@ public:
private:
// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
- SOURCE_LOG_I("PassAway, processed " << CompletedMetrics << " metrics.");
+ SOURCE_LOG_I("PassAway, processed " << CompletedMetricsCount << " metrics.");
+ if (UseMetricsQueue) {
+ MetricsQueueEvents.Unsubscribe();
+ }
TActor<TDqSolomonReadActor>::PassAway();
}
@@ -337,24 +383,15 @@ private:
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
- bool LastMetricReceived() const {
- if (UseMetricsQueue) {
- return IsConfirmedMetricsQueueFinish && CompletedMetrics == ListedMetrics;
- } else {
- return CompletedMetrics == 1;
- }
- }
-
bool LastMetricProcessed() const {
if (UseMetricsQueue) {
- return IsConfirmedMetricsQueueFinish && CompletedMetrics == ListedMetrics;
- } else {
- return MetricsData.empty() && CompletedMetrics == 1;
+ return IsMetricsQueueEmpty && CompletedMetricsCount == ListedMetricsCount;
}
+ return CompletedMetricsCount == 1;
}
void TryRequestMetrics() {
- if (Metrics.size() < MetricsPerDataQuery * MaxInflightDataRequests && !IsMetricsQueueEmpty && !IsWaitingMetricsQueueResponse) {
+ if (ListedMetrics.size() < 1000 && !IsMetricsQueueEmpty && !IsWaitingMetricsQueueResponse) {
RequestMetrics();
}
}
@@ -364,55 +401,112 @@ private:
IsWaitingMetricsQueueResponse = true;
}
- bool TryRequestData() {
+ bool TryRequestPointsCount() {
TryRequestMetrics();
- if (Metrics.empty()) {
+ if (ListedMetrics.empty()) {
return false;
}
- if (InflightDataRequests >= MaxInflightDataRequests) {
- return false;
+ RequestPointsCount();
+ return true;
+ }
+
+ void RequestPointsCount() {
+ std::vector<NSo::TMetric> requestMetrics;
+ requestMetrics.reserve(std::min<ui64>(MetricsPerPointsCountQuery, ListedMetrics.size()));
+ while (!ListedMetrics.empty() && requestMetrics.size() < MetricsPerPointsCountQuery) {
+ requestMetrics.push_back(ListedMetrics.back());
+ ListedMetrics.pop_back();
}
- RequestData();
- return true;
+ auto getPointsCountFuture = SolomonClient->GetPointsCount(std::move(requestMetrics));
+
+ NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem();
+ getPointsCountFuture.Subscribe([actorSystem, metrics = std::move(requestMetrics), selfId = SelfId()](
+ const NThreading::TFuture<NSo::TGetPointsCountResponse>& response) mutable -> void
+ {
+ actorSystem->Send(selfId, new TEvSolomonProvider::TEvPointsCountBatch(
+ std::move(metrics),
+ response.GetValue())
+ );
+ });
+ }
+
+ void TryRequestData() {
+ TryRequestPointsCount();
+ while (!MetricsWithTimeRange.empty()) {
+ RequestData();
+ TryRequestPointsCount();
+ }
}
void RequestData() {
- std::vector<TString> dataSelectors;
+ NThreading::TFuture<NSo::TGetDataResponse> getDataFuture;
+ NSo::TMetric metric;
+ TInstant from;
+ TInstant to;
+
if (UseMetricsQueue) {
- while (Metrics.size() > 0 && dataSelectors.size() < MetricsPerDataQuery) {
- dataSelectors.push_back(BuildSelectorsString(Metrics.back()));
- Metrics.pop_back();
- }
+ auto request = MetricsWithTimeRange.back();
+ MetricsWithTimeRange.pop_back();
+
+ metric = request.Metric;
+ from = request.From;
+ to = request.To;
+
+ getDataFuture = SolomonClient->GetData(metric, from, to);
} else {
- dataSelectors.push_back(ReadParams.Source.GetProgram());
+ getDataFuture = SolomonClient->GetData(
+ ReadParams.Source.GetProgram(),
+ TInstant::Seconds(ReadParams.Source.GetFrom()),
+ TInstant::Seconds(ReadParams.Source.GetTo())
+ );
}
- auto getDataFuture = SolomonClient->GetData(dataSelectors);
- InflightDataRequests++;
-
NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem();
- getDataFuture.Subscribe([actorSystem, selectorsCount = dataSelectors.size(), selfId = SelfId()](
- const NThreading::TFuture<NSo::ISolomonAccessorClient::TGetDataResult>& result) -> void
+ getDataFuture.Subscribe([actorSystem, metric, from, to, selfId = SelfId()](
+ const NThreading::TFuture<NSo::TGetDataResponse>& response) -> void
{
- actorSystem->Send(selfId, new TEvSolomonProvider::TEvNewDataBatch(selectorsCount, result.GetValue()));
+ actorSystem->Send(selfId, new TEvSolomonProvider::TEvNewDataBatch(
+ metric,
+ from,
+ to,
+ response.GetValue())
+ );
});
}
- TString BuildSelectorsString(const NSo::MetricQueue::TMetricLabels& metric) const {
- TStringBuilder result;
- bool first = true;
+ void ParsePointsCount(const std::vector<NSo::TMetric>& metrics, const std::vector<ui64>& pointsCount) {
+ TInstant from = TInstant::Seconds(ReadParams.Source.GetFrom());
+ TInstant to = TInstant::Seconds(ReadParams.Source.GetTo());
+
+ for (size_t i = 0; i < metrics.size(); ++i) {
+ auto& metric = metrics[i];
+ auto& count = pointsCount[i];
- result << "{";
- for (const auto& [key, value] : metric.GetLabels()) {
- if (!first) {
- result << ",";
+ auto ranges = SplitTimeIntervalIntoRanges(from, to, count);
+ for (const auto& [fromRange, toRange] : ranges) {
+ MetricsWithTimeRange.emplace_back(metric, fromRange, toRange);
}
- first = false;
- result << key << "=\"" << value << "\"";
}
- result << "}";
+ }
+
+ std::vector<std::pair<TInstant, TInstant>> SplitTimeIntervalIntoRanges(TInstant from, TInstant to, ui64 pointsCount) const {
+ std::vector<std::pair<TInstant, TInstant>> result;
+ if (pointsCount == 0) {
+ return result;
+ }
+
+ result.reserve(pointsCount / MaxPointsPerOneMetric);
+ auto rangeDuration = to - from;
+ for (ui64 i = 0; i < pointsCount; i += MaxPointsPerOneMetric) {
+ double start = i;
+ double end = std::min(i + MaxPointsPerOneMetric, pointsCount);
+ result.emplace_back(
+ from + rangeDuration * start / pointsCount,
+ from + rangeDuration * end / pointsCount
+ );
+ }
return result;
}
@@ -426,8 +520,8 @@ private:
NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder;
const TString LogPrefix;
const TDqSolomonReadParams ReadParams;
- const ui64 MaxInflightDataRequests;
const ui64 ComputeActorBatchSize;
+ const ui64 MetricsQueueConsumersCountDelta;
bool UseMetricsQueue;
TRetryEventsQueue MetricsQueueEvents;
@@ -435,12 +529,14 @@ private:
bool IsWaitingMetricsQueueResponse = false;
bool IsMetricsQueueEmpty = false;
bool IsConfirmedMetricsQueueFinish = false;
- std::deque<NSo::MetricQueue::TMetricLabels> Metrics;
+
+ std::deque<NSo::TMetric> ListedMetrics;
+ std::deque<TMetricTimeRange> MetricsWithTimeRange;
std::deque<NSo::TTimeseries> MetricsData;
- size_t InflightDataRequests = 0;
- size_t ListedMetrics = 0;
- size_t CompletedMetrics = 0;
- const ui64 MetricsPerDataQuery = 15;
+ size_t ListedMetricsCount = 0;
+ size_t CompletedMetricsCount = 0;
+ const ui64 MetricsPerPointsCountQuery = 50;
+ const ui64 MaxPointsPerOneMetric = 1000000;
TString SourceId;
std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider;
@@ -456,6 +552,7 @@ private:
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolomonReadActor(
NYql::NSo::NProto::TDqSolomonSource&& source,
ui64 inputIndex,
+ ui64 metricsQueueConsumersCountDelta,
TCollectStatsLevel statsLevel,
const TTxId& txId,
const NActors::TActorId& computeActorId,
@@ -482,11 +579,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom
metricsQueueActor = ActorIdFromProto(protoId);
}
- ui64 maxInflightDataRequests = 1;
- if (auto it = settings.find("maxInflightDataRequests"); it != settings.end()) {
- maxInflightDataRequests = FromString<ui64>(it->second);
- }
-
ui64 computeActorBatchSize = 1;
if (auto it = settings.find("computeActorBatchSize"); it != settings.end()) {
computeActorBatchSize = FromString<ui64>(it->second);
@@ -503,8 +595,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolom
holderFactory,
programBuilder,
std::move(params),
- maxInflightDataRequests,
computeActorBatchSize,
+ metricsQueueConsumersCountDelta,
metricsQueueActor,
counters,
credentialsProvider);
@@ -519,9 +611,15 @@ void RegisterDQSolomonReadActorFactory(TDqAsyncIoFactory& factory, ISecuredServi
{
auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
+ ui64 metricsQueueConsumersCountDelta = 0;
+ if (args.ReadRanges.size() > 1) {
+ metricsQueueConsumersCountDelta = args.ReadRanges.size() - 1;
+ }
+
return CreateDqSolomonReadActor(
std::move(settings),
args.InputIndex,
+ metricsQueueConsumersCountDelta,
args.StatsLevel,
args.TxId,
args.ComputeActorId,
diff --git a/ydb/library/yql/providers/solomon/events/events.h b/ydb/library/yql/providers/solomon/events/events.h
index 3fd23f5ae97..180494ac211 100644
--- a/ydb/library/yql/providers/solomon/events/events.h
+++ b/ydb/library/yql/providers/solomon/events/events.h
@@ -12,16 +12,36 @@ struct TEvSolomonProvider {
EvBegin = EventSpaceBegin(NKikimr::TKikimrEvents::ES_SOLOMON_PROVIDER),
// lister events
- EvGetNextBatch = EvBegin,
+ EvUpdateConsumersCount = EvBegin,
+ EvAck,
+ EvGetNextBatch,
EvMetricsBatch,
EvMetricsReadError,
// read actor events
+ EvPointsCountBatch,
EvNewDataBatch,
EvEnd
};
- static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_SOLOMON_PROVIDER), "expect EvEnd < EventSpaceEnd(TEvents::ES_S3_PROVIDER)");
+ static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_SOLOMON_PROVIDER), "expect EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_SOLOMON_PROVIDER)");
+
+ struct TEvUpdateConsumersCount :
+ public NActors::TEventPB<TEvUpdateConsumersCount, NSo::MetricQueue::TEvUpdateConsumersCount, EvUpdateConsumersCount> {
+
+ explicit TEvUpdateConsumersCount(ui64 consumersCountDelta = 0) {
+ Record.SetConsumersCountDelta(consumersCountDelta);
+ }
+ };
+
+ struct TEvAck :
+ public NActors::TEventPB<TEvAck, NSo::MetricQueue::TEvAck, EvAck> {
+
+ TEvAck() = default;
+ explicit TEvAck(const NDqProto::TMessageTransportMeta& transportMeta) {
+ *Record.MutableTransportMeta() = transportMeta;
+ }
+ };
struct TEvGetNextBatch :
public NActors::TEventPB<TEvGetNextBatch, NSo::MetricQueue::TEvGetNextBatch, EvGetNextBatch> {
@@ -31,7 +51,7 @@ struct TEvSolomonProvider {
public NActors::TEventPB<TEvMetricsBatch, NSo::MetricQueue::TEvMetricsBatch, EvMetricsBatch> {
TEvMetricsBatch() = default;
- TEvMetricsBatch(std::vector<NSo::MetricQueue::TMetricLabels> metrics, bool noMoreMetrics, const NDqProto::TMessageTransportMeta& transportMeta) {
+ TEvMetricsBatch(std::vector<NSo::MetricQueue::TMetric> metrics, bool noMoreMetrics, const NDqProto::TMessageTransportMeta& transportMeta) {
Record.MutableMetrics()->Assign(
metrics.begin(),
metrics.end());
@@ -50,12 +70,24 @@ struct TEvSolomonProvider {
}
};
+ struct TEvPointsCountBatch : public NActors::TEventLocal<TEvPointsCountBatch, EvPointsCountBatch> {
+ std::vector<NSo::TMetric> Metrics;
+ NSo::TGetPointsCountResponse Response;
+ TEvPointsCountBatch(std::vector<NSo::TMetric>&& metrics, const NSo::TGetPointsCountResponse& response)
+ : Metrics(std::move(metrics))
+ , Response(response)
+ {}
+ };
+
struct TEvNewDataBatch: public NActors::TEventLocal<TEvNewDataBatch, EvNewDataBatch> {
- ui64 SelectorsCount;
- NSo::ISolomonAccessorClient::TGetDataResult Result;
- TEvNewDataBatch(ui64 selectorsCount, NSo::ISolomonAccessorClient::TGetDataResult result)
- : SelectorsCount(selectorsCount)
- , Result(std::move(result))
+ NSo::TMetric Metric;
+ TInstant From, To;
+ NSo::TGetDataResponse Response;
+ TEvNewDataBatch(NSo::TMetric metric, TInstant from, TInstant to, const NSo::TGetDataResponse& response)
+ : Metric(metric)
+ , From(from)
+ , To(to)
+ , Response(response)
{}
};
};
diff --git a/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json b/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json
index 5c55f4d359c..8ebb3e051fd 100644
--- a/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json
+++ b/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json
@@ -61,7 +61,8 @@
{"Index": 10, "Name": "DownsamplingDisabled", "Type": "TCoBool"},
{"Index": 11, "Name": "DownsamplingAggregation", "Type": "TCoAtom"},
{"Index": 12, "Name": "DownsamplingFill", "Type": "TCoAtom"},
- {"Index": 13, "Name": "DownsamplingGridSec", "Type": "TCoUint32"}
+ {"Index": 13, "Name": "DownsamplingGridSec", "Type": "TCoUint32"},
+ {"Index": 14, "Name": "RequiredLabelNames", "Type": "TCoAtomList"}
]
},
{
@@ -83,8 +84,9 @@
{"Index": 2, "Name": "Object", "Type": "TSoObject"},
{"Index": 3, "Name": "SystemColumns", "Type": "TCoAtomList"},
{"Index": 4, "Name": "LabelNames", "Type": "TCoAtomList"},
- {"Index": 5, "Name": "RowType", "Type": "TExprBase"},
- {"Index": 6, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true}
+ {"Index": 5, "Name": "RequiredLabelNames", "Type": "TCoAtomList"},
+ {"Index": 6, "Name": "RowType", "Type": "TExprBase"},
+ {"Index": 7, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true}
]
},
{
diff --git a/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto b/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto
index e8ceda42c38..7d7393ca063 100644
--- a/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto
+++ b/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto
@@ -66,4 +66,5 @@ message TDqSolomonSource {
repeated string SystemColumns = 12;
repeated string LabelNames = 13;
map<string, string> Settings = 14;
+ repeated string RequiredLabelNames = 15;
}
diff --git a/ydb/library/yql/providers/solomon/proto/metrics_queue.proto b/ydb/library/yql/providers/solomon/proto/metrics_queue.proto
index 289630e59c7..08c9b68646c 100644
--- a/ydb/library/yql/providers/solomon/proto/metrics_queue.proto
+++ b/ydb/library/yql/providers/solomon/proto/metrics_queue.proto
@@ -4,13 +4,23 @@ package NYql.NSo.MetricQueue;
import "ydb/library/yql/dq/actors/protos/dq_events.proto";
+message TEvUpdateConsumersCount {
+ uint64 ConsumersCountDelta = 1;
+
+ optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
+}
+
+message TEvAck {
+ optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
+}
+
message TEvGetNextBatch {
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}
message TEvMetricsBatch {
bool NoMoreMetrics = 1;
- repeated TMetricLabels Metrics = 2;
+ repeated TMetric Metrics = 2;
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}
@@ -21,7 +31,7 @@ message TEvMetricsReadError {
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}
-message TMetricLabels {
- string Type = 1;
- map<string, string> Labels = 2;
+message TMetric {
+ map<string, string> Labels = 1;
+ string Type = 2;
}
diff --git a/ydb/library/yql/providers/solomon/provider/ya.make b/ydb/library/yql/providers/solomon/provider/ya.make
index 52fcdb10392..016593b68c1 100644
--- a/ydb/library/yql/providers/solomon/provider/ya.make
+++ b/ydb/library/yql/providers/solomon/provider/ya.make
@@ -26,6 +26,7 @@ PEERDIR(
ydb/library/yql/providers/solomon/expr_nodes
ydb/library/yql/providers/solomon/proto
ydb/library/yql/providers/solomon/scheme
+ ydb/library/yql/providers/solomon/solomon_accessor/client
ydb/public/sdk/cpp/src/client/types/credentials
yql/essentials/core/dq_integration
yql/essentials/providers/common/config
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp
index 3cc884e845b..3cb7e646688 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp
@@ -11,7 +11,6 @@ TSolomonConfiguration::TSolomonConfiguration()
REGISTER_SETTING(*this, MetricsQueuePrefetchSize);
REGISTER_SETTING(*this, MetricsQueueBatchCountLimit);
REGISTER_SETTING(*this, SolomonClientDefaultReplica);
- REGISTER_SETTING(*this, MaxInflightDataRequests);
REGISTER_SETTING(*this, ComputeActorBatchSize);
}
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h
index 7fbc83f2469..fa644983900 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h
@@ -15,7 +15,6 @@ struct TSolomonSettings {
NCommon::TConfSetting<ui64, false> MetricsQueuePrefetchSize;
NCommon::TConfSetting<ui64, false> MetricsQueueBatchCountLimit;
NCommon::TConfSetting<TString, false> SolomonClientDefaultReplica;
- NCommon::TConfSetting<ui64, false> MaxInflightDataRequests;
NCommon::TConfSetting<ui64, false> ComputeActorBatchSize;
};
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp
index 0c4d3956bdb..b4383ec6504 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp
@@ -34,7 +34,7 @@ public:
}
TStatus HandleSoSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
- if (!EnsureArgsCount(*input, 14, ctx)) {
+ if (!EnsureArgsCount(*input, 15, ctx)) {
return TStatus::Error;
}
@@ -66,6 +66,11 @@ public:
if (!EnsureTupleOfAtoms(labelNames, ctx)) {
return TStatus::Error;
}
+
+ auto& requiredLabelNames = *input->Child(TSoSourceSettings::idx_RequiredLabelNames);
+ if (!EnsureTupleOfAtoms(requiredLabelNames, ctx)) {
+ return TStatus::Error;
+ }
auto& from = *input->Child(TSoSourceSettings::idx_From);
if (!EnsureAtom(from, ctx) || !ValidateDatetimeFormat("from", from, ctx)) {
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
index 5960203606b..afe43910ee9 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
@@ -13,6 +13,7 @@
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_metrics_queue.h>
#include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h>
#include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h>
+#include <ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h>
#include <util/string/builder.h>
@@ -82,9 +83,20 @@ public:
{
}
- ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings&) override {
- Y_UNUSED(node);
- partitions.push_back("zz_partition");
+ ui64 Partition(const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, const TPartitionSettings& settings) override {
+ const TDqSource dqSource(&node);
+
+ if (const auto maybeSettings = dqSource.Settings().Maybe<TSoSourceSettings>()) {
+ const auto soSourceSettings = maybeSettings.Cast();
+ if (!soSourceSettings.Selectors().StringValue().empty()) {
+ for (size_t i = 0; i < settings.MaxPartitions; ++i) {
+ partitions.push_back(TStringBuilder() << "partition" << i);
+ }
+ return 0;
+ }
+ }
+
+ partitions.push_back("partition");
return 0;
}
@@ -107,20 +119,18 @@ public:
const auto& clusterName = soReadObject.DataSource().Cluster().StringValue();
const auto token = "cluster:default_" + clusterName;
- YQL_CLOG(INFO, ProviderS3) << "Wrap " << read->Content() << " with token: " << token;
+ YQL_CLOG(INFO, ProviderSolomon) << "Wrap " << read->Content() << " with token: " << token;
auto settings = soReadObject.Object().Settings();
auto& settingsRef = settings.Ref();
- const auto now = TInstant::Now();
- const auto now1h = now - TDuration::Hours(1);
- TString from = now1h.ToStringUpToSeconds();
- TString to = now.ToStringUpToSeconds();
+ TInstant from = TInstant::Now() - TDuration::Hours(1);
+ TInstant to = TInstant::Now();
TString program;
TString selectors;
- bool downsamplingDisabled = false;
- TString downsamplingAggregation = "AVG";
- TString downsamplingFill = "PREVIOUS";
- ui32 downsamplingGridSec = 15;
+ std::optional<bool> downsamplingDisabled;
+ std::optional<TString> downsamplingAggregation;
+ std::optional<TString> downsamplingFill;
+ std::optional<ui32> downsamplingGridSec;
for (auto i = 0U; i < settingsRef.ChildrenSize(); ++i) {
if (settingsRef.Child(i)->Head().IsAtom("from"sv)) {
@@ -128,8 +138,10 @@ public:
if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), settingsRef.Child(i)->Head().Content(), ctx, value)) {
return {};
}
-
- from = value;
+ if (!TInstant::TryParseIso8601(value, from)) {
+ ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), "couldn't parse `from`, use Iso8601 format, e.g. 2025-03-12T14:40:39Z"));
+ return {};
+ }
continue;
}
if (settingsRef.Child(i)->Head().IsAtom("to"sv)) {
@@ -137,8 +149,10 @@ public:
if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), settingsRef.Child(i)->Head().Content(), ctx, value)) {
return {};
}
-
- to = value;
+ if (!TInstant::TryParseIso8601(value, to)) {
+ ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), "couldn't parse `to`, use Iso8601 format, e.g. 2025-03-12T14:40:39Z"));
+ return {};
+ }
continue;
}
if (settingsRef.Child(i)->Head().IsAtom("program"sv)) {
@@ -164,10 +178,13 @@ public:
if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), settingsRef.Child(i)->Head().Content(), ctx, value)) {
return {};
}
- if (!TryFromString<bool>(value, downsamplingDisabled)) {
+ bool boolValue;
+ if (!TryFromString<bool>(value, boolValue)) {
ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), TStringBuilder() << "downsampling.disabled must be true or false, but has " << value));
return {};
}
+
+ downsamplingDisabled = boolValue;
continue;
}
if (settingsRef.Child(i)->Head().IsAtom("downsampling.aggregation"sv)) {
@@ -212,6 +229,24 @@ public:
return {};
}
+ if (downsamplingDisabled.has_value() && *downsamplingDisabled) {
+ if (downsamplingAggregation || downsamplingFill || downsamplingGridSec) {
+ ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Pos()), "downsampling.disabled must be false if downsampling.aggregation, downsampling.fill or downsamplig.grid_interval is specified"));
+ return {};
+ }
+ } else {
+ downsamplingDisabled = false;
+ if (!downsamplingAggregation) {
+ downsamplingAggregation = "AVG";
+ }
+ if (!downsamplingFill) {
+ downsamplingFill = "PREVIOUS";
+ }
+ if (!downsamplingGridSec) {
+ downsamplingGridSec = 15;
+ }
+ }
+
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TSoSourceSettings>()
.World(soReadObject.World())
@@ -222,14 +257,15 @@ public:
.RowType(soReadObject.RowType())
.SystemColumns(soReadObject.SystemColumns())
.LabelNames(soReadObject.LabelNames())
- .From<TCoAtom>().Build(from)
- .To<TCoAtom>().Build(to)
+ .RequiredLabelNames(soReadObject.RequiredLabelNames())
+ .From<TCoAtom>().Build(from.ToStringUpToSeconds())
+ .To<TCoAtom>().Build(to.ToStringUpToSeconds())
.Selectors<TCoAtom>().Build(selectors)
.Program<TCoAtom>().Build(program)
- .DownsamplingDisabled<TCoBool>().Literal().Build(downsamplingDisabled ? "true" : "false").Build()
- .DownsamplingAggregation<TCoAtom>().Build(downsamplingAggregation)
- .DownsamplingFill<TCoAtom>().Build(downsamplingFill)
- .DownsamplingGridSec<TCoUint32>().Literal().Build(ToString(downsamplingGridSec)).Build()
+ .DownsamplingDisabled<TCoBool>().Literal().Build(*downsamplingDisabled ? "true" : "false").Build()
+ .DownsamplingAggregation<TCoAtom>().Build(downsamplingAggregation ? *downsamplingAggregation : "")
+ .DownsamplingFill<TCoAtom>().Build(downsamplingFill ? *downsamplingFill : "")
+ .DownsamplingGridSec<TCoUint32>().Literal().Build(ToString(downsamplingGridSec ? *downsamplingGridSec : 0)).Build()
.Build()
.DataSource(soReadObject.DataSource().Cast<TCoDataSource>())
.RowType(soReadObject.RowType())
@@ -243,7 +279,7 @@ public:
return TSoWrite::Match(&write);
}
- void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t, TExprContext&) override {
+ void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t maxTasksPerStage, TExprContext&) override {
const TDqSource dqSource(&node);
const auto maybeSettings = dqSource.Settings().Maybe<TSoSourceSettings>();
if (!maybeSettings) {
@@ -304,35 +340,37 @@ public:
source.AddLabelNames(columnAsString);
}
+ for (const auto& c : settings.RequiredLabelNames()) {
+ const auto& labelAsString = c.StringValue();
+ source.AddRequiredLabelNames(labelAsString);
+ }
+
auto& solomonSettings = State_->Configuration;
- auto metricsQueuePageSize = solomonSettings->MetricsQueuePageSize.Get().OrElse(2000);
+ auto metricsQueuePageSize = solomonSettings->MetricsQueuePageSize.Get().OrElse(5000);
sourceSettings.insert({"metricsQueuePageSize", ToString(metricsQueuePageSize)});
- auto metricsQueuePrefetchSize = solomonSettings->MetricsQueuePrefetchSize.Get().OrElse(2000);
+ auto metricsQueuePrefetchSize = solomonSettings->MetricsQueuePrefetchSize.Get().OrElse(10000);
sourceSettings.insert({"metricsQueuePrefetchSize", ToString(metricsQueuePrefetchSize)});
- auto metricsQueueBatchCountLimit = solomonSettings->MetricsQueueBatchCountLimit.Get().OrElse(1000);
+ auto metricsQueueBatchCountLimit = solomonSettings->MetricsQueueBatchCountLimit.Get().OrElse(250);
sourceSettings.insert({"metricsQueueBatchCountLimit", ToString(metricsQueueBatchCountLimit)});
auto solomonClientDefaultReplica = solomonSettings->SolomonClientDefaultReplica.Get().OrElse("sas");
sourceSettings.insert({"solomonClientDefaultReplica", ToString(solomonClientDefaultReplica)});
- auto maxInflightDataRequests = solomonSettings->MaxInflightDataRequests.Get().OrElse(100);
- sourceSettings.insert({"maxInflightDataRequests", ToString(maxInflightDataRequests)});
-
- auto computeActorBatchSize = solomonSettings->ComputeActorBatchSize.Get().OrElse(10000);
+ auto computeActorBatchSize = solomonSettings->ComputeActorBatchSize.Get().OrElse(1000);
sourceSettings.insert({"computeActorBatchSize", ToString(computeActorBatchSize)});
if (!selectors.empty()) {
- NDq::TDqSolomonReadParams readParams{ .Source = source };
-
auto providerFactory = CreateCredentialsProviderFactoryForStructuredToken(State_->CredentialsFactory, State_->Configuration->Tokens.at(cluster));
auto credentialsProvider = providerFactory->CreateProvider();
+
+ NDq::TDqSolomonReadParams readParams{ .Source = source };
auto metricsQueueActor = NActors::TActivationContext::ActorSystem()->Register(
NDq::CreateSolomonMetricsQueueActor(
- 1,
+ maxTasksPerStage,
readParams,
credentialsProvider
),
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp
index a1ea4c46062..0a6a3c2ac2b 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp
@@ -166,6 +166,7 @@ public:
.Object(soObject)
.SystemColumns(systemColumnsNode)
.LabelNames(labelNamesNode)
+ .RequiredLabelNames().Build()
.RowType(rowTypeNode)
.ColumnOrder(std::move(userSchema.back()))
.Done().Ptr()
@@ -175,6 +176,7 @@ public:
.Object(soObject)
.SystemColumns(systemColumnsNode)
.LabelNames(labelNamesNode)
+ .RequiredLabelNames().Build()
.RowType(rowTypeNode)
.Done().Ptr();
}
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp
index 712877a6f23..67982a34eb9 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp
@@ -1,9 +1,40 @@
#include "yql_solomon_provider_impl.h"
+#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
+#include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h>
+#include <ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h>
+#include <yql/essentials/core/yql_expr_optimize.h>
+#include <yql/essentials/providers/common/provider/yql_provider_names.h>
+
namespace NYql {
using namespace NNodes;
+namespace {
+
+TMaybe<TString> ExtractSetting(const TExprNode& settings, const TString& settingName) {
+ for (size_t i = 0U; i < settings.ChildrenSize(); ++i) {
+ if (settings.Child(i)->Head().IsAtom(settingName)) {
+ return TString(settings.Child(i)->Tail().Content());
+ }
+ }
+
+ return {};
+}
+
+NSo::NProto::ESolomonClusterType MapClusterType(TSolomonClusterConfig::ESolomonClusterType clusterType) {
+ switch (clusterType) {
+ case TSolomonClusterConfig::SCT_SOLOMON:
+ return NSo::NProto::ESolomonClusterType::CT_SOLOMON;
+ case TSolomonClusterConfig::SCT_MONITORING:
+ return NSo::NProto::ESolomonClusterType::CT_MONITORING;
+ default:
+ YQL_ENSURE(false, "Invalid cluster type " << ToString<ui32>(clusterType));
+ }
+}
+
+} // namespace
+
class TSolomonLoadTableMetadataTransformer : public TGraphTransformerBase {
public:
TSolomonLoadTableMetadataTransformer(TSolomonState::TPtr state)
@@ -12,25 +43,96 @@ public:
}
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- Y_UNUSED(input);
- Y_UNUSED(output);
-
+ output = input;
if (ctx.Step.IsDone(TExprStep::LoadTablesMetadata)) {
return TStatus::Ok;
}
- return TStatus::Ok;
+ auto nodes = FindNodes(input, [&](const TExprNode::TPtr& node) -> bool {
+ if (const auto maybeRead = TMaybeNode<TSoReadObject>(node)) {
+ TSoReadObject read(node);
+ auto& settings = read.Object().Settings().Ref();
+ if (read.RequiredLabelNames().Empty() && ExtractSetting(settings, "selectors")) {
+ return true;
+ }
+ }
+ return false;
+ });
+
+ std::vector<NThreading::TFuture<NSo::TGetLabelsResponse>> futures;
+ futures.reserve(nodes.size());
+ for (const auto& n : nodes) {
+ TSoReadObject soReadObject(n);
+
+ const auto& clusterName = soReadObject.DataSource().Cluster().StringValue();
+ const auto* clusterDesc = State_->Configuration->ClusterConfigs.FindPtr(clusterName);
+ auto& settings = soReadObject.Object().Settings().Ref();
+
+ if (auto maybeSelectors = ExtractSetting(settings, "selectors")) {
+ NSo::NProto::TDqSolomonSource source;
+ source.SetEndpoint(clusterDesc->GetCluster());
+ source.SetProject(soReadObject.Object().Project().StringValue());
+ source.SetClusterType(MapClusterType(clusterDesc->GetClusterType()));
+ source.SetUseSsl(clusterDesc->GetUseSsl());
+
+ auto defaultReplica = State_->Configuration->SolomonClientDefaultReplica.Get().OrElse("sas");
+ source.MutableSettings()->insert({ "solomonClientDefaultReplica", ToString(defaultReplica) });
+
+ auto providerFactory = CreateCredentialsProviderFactoryForStructuredToken(State_->CredentialsFactory, State_->Configuration->Tokens.at(clusterName));
+ auto credentialsProvider = providerFactory->CreateProvider();
+
+ SolomonClient_ = NSo::ISolomonAccessorClient::Make(std::move(source), credentialsProvider);
+ auto future = SolomonClient_->GetLabelNames(*maybeSelectors);
+
+ LabelNamesRequests_[soReadObject.Raw()] = future;
+ futures.push_back(future);
+ }
+ }
+
+ if (futures.empty()) {
+ return TStatus::Ok;
+ }
+
+ AllFuture_ = NThreading::WaitExceptionOrAll(futures);
+ return TStatus::Async;
}
- NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
- Y_UNUSED(input);
- return AsyncFuture_;
+ NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode&) final {
+ return AllFuture_;
}
- TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext&) final {
- output = input;
+ TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
+ AllFuture_.GetValue();
+
+ TNodeMap<NThreading::TFuture<NSo::TGetLabelsResponse>> labelNamesRequests;
+ labelNamesRequests.swap(LabelNamesRequests_);
+
+ TNodeOnNodeOwnedMap replaces;
+ for (auto& [node, request] : labelNamesRequests) {
+ auto value = request.GetValue();
+
+ if (value.Status != NSo::EStatus::STATUS_OK) {
+ ctx.AddError(TIssue(ctx.GetPosition(node->Pos()),
+ TStringBuilder() << "Failed to get label names, details: " << value.Error));
+ return TStatus::Error;
+ }
- return TStatus::Ok;
+ TSoReadObject read(node);
+ TVector<TCoAtom> labelNames;
+ for (const auto& label : value.Result.Labels) {
+ labelNames.push_back(Build<TCoAtom>(ctx, read.Pos()).Value(label).Done());
+ }
+
+ replaces.emplace(node,
+ Build<TSoReadObject>(ctx, read.Pos())
+ .InitFrom(read)
+ .RequiredLabelNames()
+ .Add(labelNames)
+ .Build()
+ .Done().Ptr());
+ }
+
+ return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings{nullptr});
}
void Rewind() final {
@@ -38,7 +140,10 @@ public:
private:
TSolomonState::TPtr State_;
- NThreading::TFuture<void> AsyncFuture_;
+ NThreading::TFuture<void> AllFuture_;
+
+ NSo::ISolomonAccessorClient::TPtr SolomonClient_;
+ TNodeMap<NThreading::TFuture<NSo::TGetLabelsResponse>> LabelNamesRequests_;
};
THolder<IGraphTransformer> CreateSolomonLoadTableMetadataTransformer(TSolomonState::TPtr state) {
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_provider_impl.h b/ydb/library/yql/providers/solomon/provider/yql_solomon_provider_impl.h
index 1d1753aba6b..df477bdbf38 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_provider_impl.h
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_provider_impl.h
@@ -2,6 +2,7 @@
#include "yql_solomon_provider.h"
+#include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h>
#include <yql/essentials/core/yql_graph_transformer.h>
#include <yql/essentials/providers/common/transform/yql_exec.h>
#include <yql/essentials/providers/common/transform/yql_visit.h>
diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.cpp b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.cpp
index 8c774ec4a8e..3dc8ad9a36c 100644
--- a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.cpp
+++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.cpp
@@ -1,13 +1,20 @@
#include "solomon_accessor_client.h"
+#include <library/cpp/json/writer/json.h>
#include <library/cpp/protobuf/interop/cast.h>
+#include <library/cpp/threading/future/wait/wait.h>
+#include <util/string/join.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/public/sdk/cpp/src/library/grpc/client/grpc_client_low.h>
#include <yql/essentials/utils/url_builder.h>
#include <yql/essentials/utils/yql_panic.h>
-#include <ydb/library/yql/providers/solomon/solomon_accessor/grpc/solomon_accessor_pb.pb.h>
-#include <ydb/library/yql/providers/solomon/solomon_accessor/grpc/solomon_accessor_pb.grpc.pb.h>
+#include <ydb/library/yql/providers/solomon/solomon_accessor/grpc/data_service.pb.h>
+#include <ydb/library/yql/providers/solomon/solomon_accessor/grpc/data_service.grpc.pb.h>
+
+#include <util/string/join.h>
+#include <util/string/strip.h>
+#include <util/string/split.h>
namespace NYql::NSo {
@@ -15,8 +22,7 @@ using namespace yandex::monitoring::api::v3;
namespace {
-Downsampling::GapFilling ParseGapFilling(const TString& fill)
-{
+Downsampling::GapFilling ParseGapFilling(const TString& fill) {
if (fill == "NULL"sv) {
return Downsampling::GAP_FILLING_NULL;
}
@@ -29,8 +35,7 @@ Downsampling::GapFilling ParseGapFilling(const TString& fill)
return Downsampling::GAP_FILLING_UNSPECIFIED;
}
-Downsampling::GridAggregation ParseGridAggregation(const TString& aggregation)
-{
+Downsampling::GridAggregation ParseGridAggregation(const TString& aggregation) {
if (aggregation == "MAX"sv) {
return Downsampling::GRID_AGGREGATION_MAX;
}
@@ -52,8 +57,7 @@ Downsampling::GridAggregation ParseGridAggregation(const TString& aggregation)
return Downsampling::GRID_AGGREGATION_UNSPECIFIED;
}
-TString MetricTypeToString(MetricType type)
-{
+TString MetricTypeToString(MetricType type) {
switch (type) {
case MetricType::DGAUGE:
return "DGAUGE";
@@ -68,106 +72,283 @@ TString MetricTypeToString(MetricType type)
}
}
-class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable_shared_from_this<TSolomonAccessorClient>
-{
+std::vector<TString> ParseKnownLabelNames(const TString& selectors) {
+ auto selectorValues = StringSplitter(selectors.substr(1, selectors.size() - 2)).Split(',').SkipEmpty().ToList<TString>();
+ std::vector<TString> result;
+ result.reserve(selectorValues.size());
+ for (const auto& value : selectorValues) {
+ result.push_back(StripString(value.substr(0, value.find('='))));
+ }
+ return result;
+}
+
+TGetLabelsResponse ProcessGetLabelsResponse(NYql::IHTTPGateway::TResult&& response, std::vector<TString>&& knownLabelNames) {
+ TGetLabelsResult result;
+
+ if (response.CurlResponseCode != CURLE_OK) {
+ return TGetLabelsResponse(TStringBuilder{} << "Error while sending list metric names request to monitoring api: " << response.Issues.ToOneLineString());
+ }
+
+ if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) {
+ return TGetLabelsResponse(TStringBuilder{} << "Error while sending list metric names request to monitoring api: " << response.Content.data());
+ }
+
+ NJson::TJsonValue json;
+ try {
+ NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true);
+ } catch (const std::exception& e) {
+ return TGetLabelsResponse(TStringBuilder{} << "Failed to parse response from monitoring api: " << e.what());
+ }
+
+ if (!json.IsMap() || !json.Has("names") || !json["names"].IsArray()) {
+ return TGetLabelsResponse("Invalid result from monitoring api");
+ }
+
+ const auto names = json["names"].GetArray();
+
+ for (const auto& name : names) {
+ if (!name.IsString()) {
+ return TGetLabelsResponse("Invalid label names from monitoring api");
+ } else {
+ result.Labels.push_back(name.GetString());
+ }
+ }
+ result.Labels.insert(result.Labels.end(), knownLabelNames.begin(), knownLabelNames.end());
+
+ return TGetLabelsResponse(std::move(result));
+}
+
+TListMetricsResponse ProcessListMetricsResponse(NYql::IHTTPGateway::TResult&& response) {
+ TListMetricsResult result;
+
+ if (response.CurlResponseCode != CURLE_OK) {
+ return TListMetricsResponse(TStringBuilder{} << "Error while sending list metrics request to monitoring api: " << response.Issues.ToOneLineString());
+ }
+
+ if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) {
+ return TListMetricsResponse(TStringBuilder{} << "Error while sending list metrics request to monitoring api: " << response.Content.data());
+ }
+
+ NJson::TJsonValue json;
+ try {
+ NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true);
+ } catch (const std::exception& e) {
+ return TListMetricsResponse(TStringBuilder{} << "Failed to parse response from monitoring api: " << e.what());
+ }
+
+ if (!json.IsMap() || !json.Has("result") || !json.Has("page")) {
+ return TListMetricsResponse("Invalid list metrics result from monitoring api");
+ }
+
+ const auto pagesInfo = json["page"];
+ if (!pagesInfo.IsMap() || !pagesInfo.Has("pagesCount") || !pagesInfo["pagesCount"].IsInteger()) {
+ return TListMetricsResponse("Invalid paging info from monitoring api");
+ }
+
+ result.PagesCount = pagesInfo["pagesCount"].GetInteger();
+
+ for (const auto& metricObj : json["result"].GetArray()) {
+ if (!metricObj.IsMap() || !metricObj.Has("labels") || !metricObj["labels"].IsMap() || !metricObj.Has("type") || !metricObj["type"].IsString()) {
+ return TListMetricsResponse("Invalid list metrics result from monitoring api");
+ }
+
+ std::map<TString, TString> metricLabels;
+ for (const auto& [key, value] : metricObj["labels"].GetMap()) {
+ metricLabels[key] = value.GetString();
+ }
+
+ result.Metrics.emplace_back(std::move(metricLabels), metricObj["type"].GetString());
+ }
+
+ return TListMetricsResponse(std::move(result));
+}
+
+TGetPointsCountResponse ProcessGetPointsCountResponse(NYql::IHTTPGateway::TResult&& response, size_t expectedSize) {
+ TGetPointsCountResult result;
+
+ if (response.CurlResponseCode != CURLE_OK) {
+ return TGetPointsCountResponse(TStringBuilder() << "Error while sending points count request to monitoring api: " << response.Issues.ToOneLineString());
+ }
+
+ if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) {
+ return TGetPointsCountResponse(TStringBuilder{} << "Error while sending points count request to monitoring api: " << response.Content.data());
+ }
+
+ NJson::TJsonValue json;
+ try {
+ NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true);
+ } catch (const std::exception& e) {
+ return TGetPointsCountResponse(TStringBuilder{} << "Failed to parse points count response from monitoring api: " << e.what());
+ }
+
+ if (!json.Has("vector") || !json["vector"].IsArray()) {
+ return TGetPointsCountResponse("Invalid points count result from monitoring api");
+ }
+
+ const auto counts = json["vector"].GetArray();
+ if (counts.size() != expectedSize) {
+ return TGetPointsCountResponse("Invalid points count response size from monitoring api");
+ }
+
+ for (size_t i = 0; i < counts.size(); ++i) {
+ if (!counts[i].IsMap() || !counts[i].Has("scalar") || !counts[i].GetMap().at("scalar").IsInteger()) {
+ return TGetPointsCountResponse("Invalid points count response format from monitoring api");
+ }
+ result.PointsCount.push_back(counts[i].GetMap().at("scalar").GetInteger());
+ }
+
+ return TGetPointsCountResponse(std::move(result));
+}
+
+TGetDataResponse ProcessGetDataResponse(NYdbGrpc::TGrpcStatus&& status, ReadResponse&& response) {
+ TGetDataResult result;
+
+ if (!status.Ok()) {
+ if (status.GRpcStatusCode == grpc::StatusCode::RESOURCE_EXHAUSTED) {
+ return TGetDataResponse();
+ }
+ return TGetDataResponse(TStringBuilder{} << "Error while sending data request to monitoring api: " << status.Msg);
+ }
+
+ if (response.response_per_query_size() != 1) {
+ return TGetDataResponse("Invalid get data repsonse size from monitoring api");
+ }
+
+ const auto& responseValue = response.response_per_query()[0];
+ YQL_ENSURE(responseValue.has_timeseries_vector());
+ for (const auto& queryResponse : responseValue.timeseries_vector().values()) {
+ auto type = MetricTypeToString(queryResponse.type());
+
+ std::map<TString, TString> labels(queryResponse.labels().begin(), queryResponse.labels().end());
+ std::vector<int64_t> timestamps(queryResponse.timestamp_values().values().begin(), queryResponse.timestamp_values().values().end());
+ std::vector<double> values(queryResponse.double_values().values().begin(), queryResponse.double_values().values().end());
+
+ TMetric metric {
+ .Labels = labels,
+ .Type = type,
+ };
+
+ result.Timeseries.emplace_back(std::move(metric), std::move(timestamps), std::move(values));
+ }
+
+ return TGetDataResponse(std::move(result));
+}
+
+class TSolomonAccessorClient : public ISolomonAccessorClient, public std::enable_shared_from_this<TSolomonAccessorClient> {
public:
TSolomonAccessorClient(
const TString& defaultReplica,
const ui64 defaultGrpcPort,
- const NYql::NSo::NProto::TDqSolomonSource& settings,
- std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider
- )
+ NYql::NSo::NProto::TDqSolomonSource&& settings,
+ std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider)
: DefaultReplica(defaultReplica)
, DefaultGrpcPort(defaultGrpcPort)
- , Settings(settings)
- , CredentialsProvider(credentialsProvider)
- , HttpGateway(IHTTPGateway::Make())
- , GrpcClient(std::make_shared<NYdbGrpc::TGRpcClientLow>())
- {
+ , Settings(std::move(settings))
+ , CredentialsProvider(credentialsProvider) {
+
+ HttpConfig.SetMaxInFlightCount(HttpMaxInflight);
+ HttpGateway = IHTTPGateway::Make(&HttpConfig);
+
GrpcConfig.Locator = GetGrpcSolomonEndpoint();
GrpcConfig.EnableSsl = Settings.GetUseSsl();
+ GrpcConfig.MaxInFlight = GrpcMaxInflight;
+ GrpcClient = std::make_shared<NYdbGrpc::TGRpcClientLow>();
+ GrpcConnection = GrpcClient->CreateGRpcServiceConnection<DataService>(GrpcConfig);
+ }
+
+ ~TSolomonAccessorClient() override {
+ GrpcClient->Stop();
}
public:
- NThreading::TFuture<TListMetricsResult> ListMetrics(const TString& selectors, int pageSize, int page) const override final
- {
- const auto request = BuildListMetricsRequest(selectors, pageSize, page);
+ NThreading::TFuture<TGetLabelsResponse> GetLabelNames(const TString& selectors) const override final {
+ auto resultPromise = NThreading::NewPromise<TGetLabelsResponse>();
+
+ auto cb = [resultPromise, selectors](NYql::IHTTPGateway::TResult&& result) mutable {
+ resultPromise.SetValue(ProcessGetLabelsResponse(std::move(result), ParseKnownLabelNames(selectors)));
+ };
- IHTTPGateway::THeaders headers;
- if (auto authInfo = GetAuthInfo()) {
- headers.Fields.emplace_back(TStringBuilder{} << "Authorization: " << *authInfo);
- }
+ DoHttpRequest(
+ std::move(cb),
+ BuildGetLabelsUrl(selectors)
+ );
+
+ return resultPromise.GetFuture();
+ }
- auto resultPromise = NThreading::NewPromise<TListMetricsResult>();
+ NThreading::TFuture<TListMetricsResponse> ListMetrics(const TString& selectors, int pageSize, int page) const override final {
+ auto resultPromise = NThreading::NewPromise<TListMetricsResponse>();
- std::weak_ptr<const TSolomonAccessorClient> weakSelf = shared_from_this();
- // hold context until reply
- auto cb = [weakSelf, resultPromise](NYql::IHTTPGateway::TResult&& result) mutable
- {
- if (auto self = weakSelf.lock()) {
- resultPromise.SetValue(self->ProcessHttpResponse(std::move(result)));
- } else {
- resultPromise.SetValue(TListMetricsResult("Client has been shut down"));
- }
+ auto cb = [resultPromise](NYql::IHTTPGateway::TResult&& result) mutable {
+ resultPromise.SetValue(ProcessListMetricsResponse(std::move(result)));
+ };
+
+ DoHttpRequest(
+ std::move(cb),
+ BuildListMetricsUrl(selectors, pageSize, page)
+ );
+
+ return resultPromise.GetFuture();
+ }
+
+ NThreading::TFuture<TGetPointsCountResponse> GetPointsCount(const std::vector<TMetric>& metrics) const override final {
+ auto requestUrl = BuildGetPointsCountUrl();
+ auto requestBody = BuildGetPointsCountBody(metrics);
+
+ auto resultPromise = NThreading::NewPromise<TGetPointsCountResponse>();
+
+ auto cb = [resultPromise, expectedSize = metrics.size()](NYql::IHTTPGateway::TResult&& response) mutable {
+ resultPromise.SetValue(ProcessGetPointsCountResponse(std::move(response), expectedSize));
};
- HttpGateway->Download(
- request,
- headers,
- 0,
- ListSizeLimit,
- std::move(cb)
+ DoHttpRequest(
+ std::move(cb),
+ std::move(requestUrl),
+ std::move(requestBody)
);
return resultPromise.GetFuture();
}
- NThreading::TFuture<TGetDataResult> GetData(const std::vector<TString>& selectors) const override final
- {
- const auto request = BuildGetDataRequest(selectors);
+ NThreading::TFuture<TGetDataResponse> GetData(TMetric metric, TInstant from, TInstant to) const override final {
+ return GetData(BuildSelectorsProgram(metric), from, to);
+ }
+
+ NThreading::TFuture<TGetDataResponse> GetData(TString selectors, TInstant from, TInstant to) const override final {
+ const auto request = BuildGetDataRequest(selectors, from, to);
NYdbGrpc::TCallMeta callMeta;
if (auto authInfo = GetAuthInfo()) {
callMeta.Aux.emplace_back("authorization", *authInfo);
}
+ callMeta.Aux.emplace_back("x-client-id", "yandex-query");
- auto resultPromise = NThreading::NewPromise<TGetDataResult>();
-
- const auto connection = GrpcClient->CreateGRpcServiceConnection<DataService>(GrpcConfig);
+ auto resultPromise = NThreading::NewPromise<TGetDataResponse>();
auto context = GrpcClient->CreateContext();
if (!context) {
- resultPromise.SetValue(TGetDataResult("Client is being shutted down"));
+ resultPromise.SetValue(TGetDataResponse("Client is being shutted down"));
return resultPromise.GetFuture();
}
- std::weak_ptr<const TSolomonAccessorClient> weakSelf = shared_from_this();
// hold context until reply
- auto cb = [weakSelf, resultPromise, context](
- NYdbGrpc::TGrpcStatus&& status,
- ReadResponse&& result) mutable
- {
- if (auto self = weakSelf.lock()) {
- resultPromise.SetValue(self->ProcessGrpcResponse(std::move(status), std::move(result)));
- } else {
- resultPromise.SetValue(TGetDataResult("Client has been shut down"));
- }
+ auto cb = [resultPromise, context](NYdbGrpc::TGrpcStatus&& status, ReadResponse&& result) mutable {
+ resultPromise.SetValue(ProcessGetDataResponse(std::move(status), std::move(result)));
};
- connection->DoRequest<ReadRequest, ReadResponse>(
- std::move(request),
- std::move(cb),
- &DataService::Stub::AsyncRead,
- callMeta,
- context.get()
- );
+ GrpcConnection->DoRequest<ReadRequest, ReadResponse>(
+ std::move(request),
+ std::move(cb),
+ &DataService::Stub::AsyncRead,
+ callMeta,
+ context.get()
+ );
return resultPromise.GetFuture();
}
private:
- TMaybe<TString> GetAuthInfo() const
- {
+ std::optional<TString> GetAuthInfo() const {
if (!Settings.GetUseSsl()) {
return {};
}
@@ -184,18 +365,75 @@ private:
}
}
- TString GetHttpSolomonEndpoint() const
- {
+ TString GetHttpSolomonEndpoint() const {
return (Settings.GetUseSsl() ? "https://" : "http://") + Settings.GetEndpoint();
}
- TString GetGrpcSolomonEndpoint() const
- {
+ TString GetGrpcSolomonEndpoint() const {
return TStringBuilder() << Settings.GetEndpoint() << ":" << DefaultGrpcPort;
}
- TString BuildListMetricsRequest(const TString& selectors, int pageSize, int page) const
- {
+ template <typename TCallback>
+ void DoHttpRequest(TCallback&& callback, TString&& url, TString&& body = "") const {
+ IHTTPGateway::THeaders headers;
+ if (auto authInfo = GetAuthInfo()) {
+ headers.Fields.emplace_back(TStringBuilder{} << "Authorization: " << *authInfo);
+ }
+ headers.Fields.emplace_back("x-client-id: yandex-query");
+ headers.Fields.emplace_back("accept: application/json;charset=UTF-8");
+ headers.Fields.emplace_back("Content-Type: application/json;charset=UTF-8");
+
+ auto retryPolicy = IHTTPGateway::TRetryPolicy::GetExponentialBackoffPolicy(
+ [](CURLcode, long httpCode) {
+ if (httpCode == 429 /*RESOURCE_EXHAUSTED*/) {
+ return ERetryErrorClass::ShortRetry;
+ }
+ return ERetryErrorClass::NoRetry;
+ },
+ TDuration::MilliSeconds(5),
+ TDuration::MilliSeconds(200),
+ TDuration::MilliSeconds(500)
+ );
+
+ if (!body.empty()) {
+ HttpGateway->Upload(
+ std::move(url),
+ std::move(headers),
+ std::move(body),
+ std::move(callback),
+ false,
+ retryPolicy
+ );
+ } else {
+ HttpGateway->Download(
+ std::move(url),
+ std::move(headers),
+ 0,
+ ListSizeLimit,
+ std::move(callback),
+ {},
+ retryPolicy
+ );
+ }
+ }
+
+ TString BuildGetLabelsUrl(const TString& selectors) const {
+ TUrlBuilder builder(GetHttpSolomonEndpoint());
+
+ builder.AddPathComponent("api");
+ builder.AddPathComponent("v2");
+ builder.AddPathComponent("projects");
+ builder.AddPathComponent(Settings.GetProject());
+ builder.AddPathComponent("sensors");
+ builder.AddPathComponent("names");
+
+ builder.AddUrlParam("selectors", selectors);
+ builder.AddUrlParam("forceCluster", DefaultReplica);
+
+ return builder.Build();
+ }
+
+ TString BuildListMetricsUrl(const TString& selectors, int pageSize, int page) const {
TUrlBuilder builder(GetHttpSolomonEndpoint());
builder.AddPathComponent("api");
@@ -208,17 +446,67 @@ private:
builder.AddUrlParam("forceCluster", DefaultReplica);
builder.AddUrlParam("pageSize", std::to_string(pageSize));
builder.AddUrlParam("page", std::to_string(page));
+ builder.AddUrlParam("from", TInstant::Seconds(Settings.GetFrom()).ToIsoStringLocalUpToSeconds());
+ builder.AddUrlParam("to", TInstant::Seconds(Settings.GetTo()).ToIsoStringLocalUpToSeconds());
+
+ return builder.Build();
+ }
+
+ TString BuildGetPointsCountUrl() const {
+ TUrlBuilder builder(GetHttpSolomonEndpoint());
+
+ builder.AddPathComponent("api");
+ builder.AddPathComponent("v2");
+ builder.AddPathComponent("projects");
+ builder.AddPathComponent(Settings.GetProject());
+ builder.AddPathComponent("sensors");
+ builder.AddPathComponent("data");
return builder.Build();
}
- ReadRequest BuildGetDataRequest(const std::vector<TString>& selectors) const
- {
+ TString BuildGetPointsCountBody(const std::vector<TMetric>& metrics) const {
+ std::vector<TString> selectors;
+ selectors.reserve(metrics.size());
+ std::transform(
+ metrics.begin(),
+ metrics.end(),
+ std::back_inserter(selectors),
+ [this](const TMetric& metric) -> TString {
+ return TStringBuilder() << "count(" << BuildSelectorsProgram(metric) << ")";
+ }
+ );
+
+ TString program = TStringBuilder() << "[" << JoinSeq(",", selectors) << "]";
+
+ const auto& ds = Settings.GetDownsampling();
+ NJsonWriter::TBuf w;
+ w.BeginObject()
+ .UnsafeWriteKey("from").WriteString(TInstant::Seconds(Settings.GetFrom()).ToString())
+ .UnsafeWriteKey("to").WriteString(TInstant::Seconds(Settings.GetTo()).ToString())
+ .UnsafeWriteKey("program").WriteString(program)
+ .UnsafeWriteKey("downsampling")
+ .BeginObject()
+ .UnsafeWriteKey("disabled").WriteBool(ds.GetDisabled());
+
+ if (!ds.GetDisabled()) {
+ w.UnsafeWriteKey("aggregation").WriteString(ds.GetAggregation())
+ .UnsafeWriteKey("fill").WriteString(ds.GetFill())
+ .UnsafeWriteKey("gridMillis").WriteLongLong(ds.GetGridMs());
+ }
+ w.EndObject()
+ .UnsafeWriteKey("forceCluster").WriteString(DefaultReplica)
+ .EndObject();
+
+ return w.Str();
+ }
+
+ ReadRequest BuildGetDataRequest(const TString& selectors, TInstant from, TInstant to) const {
ReadRequest request;
request.mutable_container()->set_project_id(Settings.GetProject());
- *request.mutable_from_time() = NProtoInterop::CastToProto(TInstant::Seconds(Settings.GetFrom()));
- *request.mutable_to_time() = NProtoInterop::CastToProto(TInstant::Seconds(Settings.GetTo()));
+ *request.mutable_from_time() = NProtoInterop::CastToProto(from);
+ *request.mutable_to_time() = NProtoInterop::CastToProto(to);
*request.mutable_force_replica() = DefaultReplica;
if (Settings.GetDownsampling().GetDisabled()) {
@@ -230,152 +518,54 @@ private:
request.mutable_downsampling()->set_gap_filling(ParseGapFilling(downsampling.GetFill()));
}
- ui64 cnt = 0;
- for (const auto& metric : selectors) {
- auto query = request.mutable_queries()->Add();
- *query->mutable_value() = metric;
- *query->mutable_name() = TStringBuilder() << "query" << cnt++;
- query->set_hidden(false);
- }
+ auto query = request.mutable_queries()->Add();
+ *query->mutable_value() = selectors;
+ *query->mutable_name() = "query";
+ query->set_hidden(false);
return request;
}
- TListMetricsResult ProcessHttpResponse(NYql::IHTTPGateway::TResult&& response) const
- {
- std::vector<TMetric> result;
-
- if (response.Content.HttpResponseCode < 200 || response.Content.HttpResponseCode >= 300) {
- return TListMetricsResult(TStringBuilder{} << "Error while sending list metrics request to monitoring api: " << response.Content.data());
- }
-
- NJson::TJsonValue json;
- try {
- NJson::ReadJsonTree(response.Content.data(), &json, /*throwOnError*/ true);
- } catch (const std::exception& e) {
- return TListMetricsResult(TStringBuilder{} << "Failed to parse response from monitoring api: " << e.what());
- }
-
- if (!json.IsMap() || !json.Has("result") || !json.Has("page")) {
- return TListMetricsResult(TStringBuilder{} << "Invalid result from monitoring api");
- }
-
- const auto pagesInfo = json["page"];
- if (!pagesInfo.IsMap() || !pagesInfo.Has("pagesCount") || !pagesInfo["pagesCount"].IsInteger()) {
- return TListMetricsResult(TStringBuilder{} << "Invalid paging info from monitoring api");
- }
-
- size_t pagesCount = pagesInfo["pagesCount"].GetInteger();
-
- for (const auto& metricObj : json["result"].GetArray()) {
- try {
- result.emplace_back(metricObj);
- } catch (const std::exception& e) {
- return TListMetricsResult(TStringBuilder{} << "Failed to parse result response from monitoring: " << e.what());
+ TString BuildSelectorsProgram(const TMetric& metric) const {
+ std::vector<TString> mappedValues;
+ mappedValues.reserve(Settings.GetRequiredLabelNames().size());
+ std::transform(
+ Settings.GetRequiredLabelNames().begin(),
+ Settings.GetRequiredLabelNames().end(),
+ std::back_inserter(mappedValues),
+ [&labels = metric.Labels](const TString& labelName) -> TString {
+ if (labels.find(labelName) == labels.end()) {
+ return TStringBuilder() << labelName << "=\"-\"";
+ }
+ return TStringBuilder() << labelName << "=\"" << labels.at(labelName) << "\"";
}
- }
-
- return TListMetricsResult(pagesCount, std::move(result));
- }
-
- TGetDataResult ProcessGrpcResponse(NYdbGrpc::TGrpcStatus&& status, ReadResponse&& response) const
- {
- std::vector<TTimeseries> result;
-
- if (!status.Ok()) {
- return TGetDataResult(TStringBuilder{} << "Error while sending data request to monitoring api: " << status.Msg);
- }
-
- for (const auto& responseValue : response.response_per_query()) {
- YQL_ENSURE(responseValue.has_timeseries_vector());
- for (const auto& queryResponse : responseValue.timeseries_vector().values()) {
- auto type = MetricTypeToString(queryResponse.type());
-
- std::map<TString, TString> labels(queryResponse.labels().begin(), queryResponse.labels().end());
- std::vector<int64_t> timestamps(queryResponse.timestamp_values().values().begin(), queryResponse.timestamp_values().values().end());
- std::vector<double> values(queryResponse.double_values().values().begin(), queryResponse.double_values().values().end());
-
- result.emplace_back(queryResponse.name(), std::move(labels), type, std::move(timestamps), std::move(values));
- }
-
- }
+ );
- return TGetDataResult(std::move(result));
+ return TStringBuilder() << "{" << JoinSeq(",", mappedValues) << "}";
}
private:
const TString DefaultReplica;
const ui64 DefaultGrpcPort;
- const size_t ListSizeLimit = 1ull << 20;
- const NYql::NSo::NProto::TDqSolomonSource& Settings;
+ const ui64 ListSizeLimit = 1ull << 20;
+ const ui64 HttpMaxInflight = 200;
+ const ui64 GrpcMaxInflight = 2000;
+ const NYql::NSo::NProto::TDqSolomonSource Settings;
const std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider;
+ THttpGatewayConfig HttpConfig;
IHTTPGateway::TPtr HttpGateway;
NYdbGrpc::TGRpcClientConfig GrpcConfig;
+ std::unique_ptr<NYdbGrpc::TServiceConnection<DataService>> GrpcConnection;
std::shared_ptr<NYdbGrpc::TGRpcClientLow> GrpcClient;
};
} // namespace
-TMetric::TMetric(const NJson::TJsonValue& value)
-{
- YQL_ENSURE(value.IsMap());
-
- if (value.Has("labels")) {
- auto labels = value["labels"];
- YQL_ENSURE(labels.IsMap());
-
- for (const auto& [key, value] : labels.GetMapSafe()) {
- YQL_ENSURE(value.IsString());
- Labels[key] = value.GetString();
- }
- }
-
- if (value.Has("type")) {
- YQL_ENSURE(value["type"].IsString());
- Type = value["type"].GetString();
- }
-
- if (value.Has("createdAt")) {
- YQL_ENSURE(value["createdAt"].IsString());
- CreatedAt = value["createdAt"].GetString();
- }
-}
-
-ISolomonAccessorClient::TListMetricsResult::TListMetricsResult()
- : Success(false)
-{}
-
-ISolomonAccessorClient::TListMetricsResult::TListMetricsResult(const TString& error)
- : Success(false)
- , ErrorMsg(error)
-{}
-
-ISolomonAccessorClient::TListMetricsResult::TListMetricsResult(size_t pagesCount, std::vector<TMetric>&& result)
- : Success(true)
- , PagesCount(pagesCount)
- , Result(std::move(result))
-{}
-
-ISolomonAccessorClient::TGetDataResult::TGetDataResult()
- : Success(false)
-{}
-
-ISolomonAccessorClient::TGetDataResult::TGetDataResult(const TString& error)
- : Success(false)
- , ErrorMsg(error)
-{}
-
-ISolomonAccessorClient::TGetDataResult::TGetDataResult(std::vector<TTimeseries>&& result)
- : Success(true)
- , Result(std::move(result))
-{}
-
ISolomonAccessorClient::TPtr
ISolomonAccessorClient::Make(
- const NYql::NSo::NProto::TDqSolomonSource& source,
- std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider)
-{
+ NYql::NSo::NProto::TDqSolomonSource source,
+ std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider) {
const auto& settings = source.settings();
TString defaultReplica = "sas";
@@ -388,7 +578,7 @@ ISolomonAccessorClient::Make(
defaultGrpcPort = FromString<ui64>(it->second);
}
- return std::make_shared<TSolomonAccessorClient>(defaultReplica, defaultGrpcPort, source, credentialsProvider);
+ return std::make_shared<TSolomonAccessorClient>(defaultReplica, defaultGrpcPort, std::move(source), credentialsProvider);
}
} // namespace NYql::NSo
diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h
index a680f29e7ce..87e1728e51c 100644
--- a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h
+++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_accessor_client.h
@@ -1,31 +1,12 @@
#pragma once
-#include <library/cpp/json/json_reader.h>
#include <library/cpp/threading/future/core/future.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/credentials/credentials.h>
#include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h>
+#include <ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.h>
namespace NYql::NSo {
-class TMetric {
-public:
- explicit TMetric(const NJson::TJsonValue& value);
-
-public:
- std::map<TString, TString> Labels;
- TString Type;
- TString CreatedAt;
-};
-
-class TTimeseries {
-public:
- TString Name;
- std::map<TString, TString> Labels;
- TString Type;
- std::vector<int64_t> Timestamps;
- std::vector<double> Values;
-};
-
class ISolomonAccessorClient {
public:
using TPtr = std::shared_ptr<ISolomonAccessorClient>;
@@ -34,37 +15,15 @@ public:
virtual ~ISolomonAccessorClient() = default;
static TPtr Make(
- const NYql::NSo::NProto::TDqSolomonSource& source,
+ NYql::NSo::NProto::TDqSolomonSource source,
std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider);
- class TListMetricsResult {
- public:
- explicit TListMetricsResult();
- explicit TListMetricsResult(const TString& errorMsg);
- explicit TListMetricsResult(size_t pagesCount, std::vector<TMetric>&& result);
-
- public:
- bool Success;
- TString ErrorMsg;
- size_t PagesCount;
- std::vector<TMetric> Result;
- };
-
- class TGetDataResult {
- public:
- explicit TGetDataResult();
- explicit TGetDataResult(const TString& errorMsg);
- explicit TGetDataResult(std::vector<TTimeseries>&& result);
-
- public:
- bool Success;
- TString ErrorMsg;
- std::vector<TTimeseries> Result;
- };
-
public:
- virtual NThreading::TFuture<TListMetricsResult> ListMetrics(const TString& selectors, int pageSize, int page) const = 0;
- virtual NThreading::TFuture<TGetDataResult> GetData(const std::vector<TString>& selectors) const = 0;
+ virtual NThreading::TFuture<TGetLabelsResponse> GetLabelNames(const TString& selectors) const = 0;
+ virtual NThreading::TFuture<TListMetricsResponse> ListMetrics(const TString& selectors, int pageSize, int page) const = 0;
+ virtual NThreading::TFuture<TGetPointsCountResponse> GetPointsCount(const std::vector<TMetric>& metrics) const = 0;
+ virtual NThreading::TFuture<TGetDataResponse> GetData(TMetric metric, TInstant from, TInstant to) const = 0;
+ virtual NThreading::TFuture<TGetDataResponse> GetData(TString selectors, TInstant from, TInstant to) const = 0;
};
} // namespace NYql::NSo
diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.cpp b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.cpp
new file mode 100644
index 00000000000..5356e508288
--- /dev/null
+++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.cpp
@@ -0,0 +1,26 @@
+#include "solomon_client_utils.h"
+
+#include <yql/essentials/utils/yql_panic.h>
+
+namespace NYql::NSo {
+
+template <typename T>
+TSolomonClientResponse<T>::TSolomonClientResponse()
+ : Status(STATUS_RETRIABLE_ERROR) {}
+
+template <typename T>
+TSolomonClientResponse<T>::TSolomonClientResponse(const TString& error)
+ : Status(STATUS_FATAL_ERROR)
+ , Error(error) {}
+
+template <typename T>
+TSolomonClientResponse<T>::TSolomonClientResponse(T&& result)
+ : Status(STATUS_OK)
+ , Result(std::move(result)) {}
+
+template class TSolomonClientResponse<TGetLabelsResult>;
+template class TSolomonClientResponse<TListMetricsResult>;
+template class TSolomonClientResponse<TGetPointsCountResult>;
+template class TSolomonClientResponse<TGetDataResult>;
+
+} // namespace NYql::NSo
diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.h b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.h
new file mode 100644
index 00000000000..6682d4e5989
--- /dev/null
+++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/solomon_client_utils.h
@@ -0,0 +1,64 @@
+#pragma once
+
+#include <library/cpp/json/json_reader.h>
+
+#include <map>
+#include <vector>
+
+namespace NYql::NSo {
+
+enum EStatus {
+ STATUS_OK,
+ STATUS_RETRIABLE_ERROR,
+ STATUS_FATAL_ERROR
+};
+
+template <typename T>
+class TSolomonClientResponse {
+public:
+ TSolomonClientResponse();
+ explicit TSolomonClientResponse(const TString& error);
+ explicit TSolomonClientResponse(T&& result);
+
+ TSolomonClientResponse(const TSolomonClientResponse&) = default;
+ TSolomonClientResponse(TSolomonClientResponse&&) = default;
+
+public:
+ EStatus Status;
+ TString Error;
+ T Result;
+};
+
+struct TMetric {
+ std::map<TString, TString> Labels;
+ TString Type;
+};
+
+struct TTimeseries {
+ TMetric Metric;
+ std::vector<int64_t> Timestamps;
+ std::vector<double> Values;
+};
+
+struct TGetLabelsResult {
+ std::vector<TString> Labels;
+};
+using TGetLabelsResponse = TSolomonClientResponse<TGetLabelsResult>;
+
+struct TListMetricsResult {
+ std::vector<TMetric> Metrics;
+ ui64 PagesCount;
+};
+using TListMetricsResponse = TSolomonClientResponse<TListMetricsResult>;
+
+struct TGetPointsCountResult {
+ std::vector<ui64> PointsCount;
+};
+using TGetPointsCountResponse = TSolomonClientResponse<TGetPointsCountResult>;
+
+struct TGetDataResult {
+ std::vector<TTimeseries> Timeseries;
+};
+using TGetDataResponse = TSolomonClientResponse<TGetDataResult>;
+
+} // namespace NYql::NSo
diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/client/ya.make b/ydb/library/yql/providers/solomon/solomon_accessor/client/ya.make
index 0789f4ab7b5..9ecc766d0a1 100644
--- a/ydb/library/yql/providers/solomon/solomon_accessor/client/ya.make
+++ b/ydb/library/yql/providers/solomon/solomon_accessor/client/ya.make
@@ -2,6 +2,7 @@ LIBRARY()
SRCS(
solomon_accessor_client.cpp
+ solomon_client_utils.cpp
)
PEERDIR(
diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/grpc/data_service.proto b/ydb/library/yql/providers/solomon/solomon_accessor/grpc/data_service.proto
new file mode 100644
index 00000000000..947b1675ece
--- /dev/null
+++ b/ydb/library/yql/providers/solomon/solomon_accessor/grpc/data_service.proto
@@ -0,0 +1,192 @@
+syntax = "proto3";
+
+package yandex.monitoring.api.v3;
+
+import "google/protobuf/timestamp.proto";
+import "google/rpc/status.proto";
+
+option go_package = "github.com/ydb-platform/ydb/ydb/library/yql/providers/solomon/solomon_accessor/grpc;solomon_accessor_pb";
+
+// A set of methods for reading metrics data.
+service DataService {
+ // Retrieves metric data based on multi-query input.
+ rpc Read(ReadRequest) returns (ReadResponse) {
+ }
+}
+
+message Container {
+ // Default project/folder for this request.
+ oneof container {
+ // ID of the project that will be used as default.
+ // (unless it's specified in query)
+ string project_id = 1;
+
+ // Cloud folder that will be used as default.
+ // (unless it's specified in query)
+ string folder_id = 2;
+ }
+}
+
+// List of available aggregate functions for downsampling.
+message Downsampling {
+ // List of available aggregate functions for downsampling.
+ enum GridAggregation {
+ GRID_AGGREGATION_UNSPECIFIED = 0;
+
+ // Max value.
+ GRID_AGGREGATION_MAX = 1;
+
+ // Min value.
+ GRID_AGGREGATION_MIN = 2;
+
+ // Sum of values.
+ GRID_AGGREGATION_SUM = 3;
+
+ // Average value.
+ GRID_AGGREGATION_AVG = 4;
+
+ // Last value.
+ GRID_AGGREGATION_LAST = 5;
+
+ // Total count of points.
+ GRID_AGGREGATION_COUNT = 6;
+ }
+
+ // List of available gap filling policy for downsampling.
+ enum GapFilling {
+ GAP_FILLING_UNSPECIFIED = 0;
+
+ // Returns `null` as a metric value and `timestamp` as a time series value.
+ GAP_FILLING_NULL = 1;
+
+ // Returns no value and no timestamp.
+ GAP_FILLING_NONE = 2;
+
+ // Returns the value from the previous time interval.
+ GAP_FILLING_PREVIOUS = 3;
+ }
+
+ oneof mode {
+ // Maximum number of points to be returned.
+ int64 max_points = 1;
+
+ // Time interval (grid) for downsampling in milliseconds.
+ // Points in the specified range are aggregated into one time point.
+ int64 grid_interval = 2;
+
+ // Disable downsampling.
+ bool disabled = 3;
+ }
+
+ // Function that is used for downsampling.
+ GridAggregation grid_aggregation = 4;
+
+ // Parameters for filling gaps in data.
+ GapFilling gap_filling = 5;
+}
+
+// Query used in multi-source requests for metrics retrieving.
+message Query {
+ // Name of the query.
+ string name = 1;
+
+ // Text of the program that will be executed.
+ string value = 2;
+
+ // Flag for hiding result timeseries from response.
+ // Note: logic will still be executed,
+ // so result of the query processing can be used in following calculations
+ bool hidden = 3;
+}
+
+message ReadRequest {
+ // Default container (project/folder) for this request.
+ Container container = 1;
+
+ // Time from which data should be collected.
+ google.protobuf.Timestamp from_time = 3;
+
+ // Time until which data should be collected.
+ google.protobuf.Timestamp to_time = 4;
+
+ // Queries which are data gathered based on.
+ repeated Query queries = 5;
+
+ // Downsampling parameters.
+ Downsampling downsampling = 6;
+
+ // Force cluster.
+ string force_replica = 7;
+}
+
+message ReadResponse {
+ // Data response for each not hidden query.
+ repeated ResponsePerQuery response_per_query = 1;
+
+ // Error raised during request processing.
+ repeated google.rpc.Status errors = 2;
+}
+
+message ResponsePerQuery {
+ message TimeseriesVector {
+ // Timeseries values.
+ repeated TimeseriesValue values = 1;
+ }
+
+ // Name of the query.
+ string query_name = 1;
+
+ // Result values for the query.
+ oneof result_values {
+ // Timeseries vector.
+ TimeseriesVector timeseries_vector = 2;
+ }
+}
+
+enum MetricType {
+ METRIC_TYPE_UNSPECIFIED = 0;
+
+ // Gauge with fractional values.
+ DGAUGE = 1;
+
+ // Gauge with integer values.
+ IGAUGE = 2;
+
+ // Counter.
+ COUNTER = 3;
+
+ // Rate.
+ RATE = 4;
+}
+
+message TimeseriesValue {
+ // Alias.
+ string alias = 1;
+
+ // Name of the metric.
+ string name = 2;
+
+ // List of metric labels as `key:value` pairs.
+ map<string, string> labels = 3;
+
+ // Type of the metric.
+ MetricType type = 4;
+
+ // List of timestamps.
+ Int64Vector timestamp_values = 5;
+
+ oneof values {
+ // List of double values.
+ DoubleVector double_values = 6;
+ }
+}
+
+message DoubleVector {
+ // List of double values.
+ repeated double values = 1;
+}
+
+message Int64Vector {
+ // List of int64 values.
+ repeated int64 values = 1;
+}
diff --git a/ydb/library/yql/providers/solomon/solomon_accessor/grpc/ya.make b/ydb/library/yql/providers/solomon/solomon_accessor/grpc/ya.make
index cc46d279fd5..7ee874031a5 100644
--- a/ydb/library/yql/providers/solomon/solomon_accessor/grpc/ya.make
+++ b/ydb/library/yql/providers/solomon/solomon_accessor/grpc/ya.make
@@ -9,7 +9,7 @@ ENDIF()
GRPC()
SRCS(
- solomon_accessor_pb.proto
+ data_service.proto
)
USE_COMMON_GOOGLE_APIS(
diff --git a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Basic-default.txt_/opt.yql b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Basic-default.txt_/opt.yql
index e699fa52b7b..96756a09c5b 100644
--- a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Basic-default.txt_/opt.yql
+++ b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Basic-default.txt_/opt.yql
@@ -4,7 +4,7 @@
(let $3 (DataType 'String))
(let $4 (StructType '('"labels" (DictType $3 $3)) '('"ts" (DataType 'Datetime)) '('type $3) '('"value" (DataType 'Double))))
(let $5 '('"labels" '"value" '"ts" 'type))
-(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15")))
+(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15") '()))
(let $7 (DqStage '((DqSource (DataSource '"solomon" '"local_solomon") $6)) (lambda '($10) $10) '('('"_logical_id" '0))))
(let $8 (DqStage '((DqCnUnionAll (TDqOutput $7 '"0"))) (lambda '($11) $11) '('('"_logical_id" '0))))
(let $9 (ResPull! $1 $2 (Key) (DqCnResult (TDqOutput $8 '"0") '()) '('('type) '('autoref)) '"dq"))
diff --git a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Downsampling-default.txt_/opt.yql b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Downsampling-default.txt_/opt.yql
index ffdb5aec084..4a755e7e36f 100644
--- a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Downsampling-default.txt_/opt.yql
+++ b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Downsampling-default.txt_/opt.yql
@@ -4,7 +4,7 @@
(let $3 (DataType 'String))
(let $4 (StructType '('"labels" (DictType $3 $3)) '('"ts" (DataType 'Datetime)) '('type $3) '('"value" (DataType 'Double))))
(let $5 '('"labels" '"value" '"ts" 'type))
-(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"SUM" '"PREVIOUS" (Uint32 '"25")))
+(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"SUM" '"PREVIOUS" (Uint32 '"25") '()))
(let $7 (DqStage '((DqSource (DataSource '"solomon" '"local_solomon") $6)) (lambda '($10) $10) '('('"_logical_id" '0))))
(let $8 (DqStage '((DqCnUnionAll (TDqOutput $7 '"0"))) (lambda '($11) $11) '('('"_logical_id" '0))))
(let $9 (ResPull! $1 $2 (Key) (DqCnResult (TDqOutput $8 '"0") '()) '('('type) '('autoref)) '"dq"))
diff --git a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-DownsamplingValidSettings-default.txt_/opt.yql b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-DownsamplingValidSettings-default.txt_/opt.yql
index 83000c22cb2..1a969a8de67 100644
--- a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-DownsamplingValidSettings-default.txt_/opt.yql
+++ b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-DownsamplingValidSettings-default.txt_/opt.yql
@@ -4,7 +4,7 @@
(let $3 (DataType 'String))
(let $4 (StructType '('"labels" (DictType $3 $3)) '('"ts" (DataType 'Datetime)) '('type $3) '('"value" (DataType 'Double))))
(let $5 '('"labels" '"value" '"ts" 'type))
-(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"SUM" '"PREVIOUS" (Uint32 '"15")))
+(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"SUM" '"PREVIOUS" (Uint32 '"15") '()))
(let $7 (DqStage '((DqSource (DataSource '"solomon" '"local_solomon") $6)) (lambda '($10) $10) '('('"_logical_id" '0))))
(let $8 (DqStage '((DqCnUnionAll (TDqOutput $7 '"0"))) (lambda '($11) $11) '('('"_logical_id" '0))))
(let $9 (ResPull! $1 $2 (Key) (DqCnResult (TDqOutput $8 '"0") '()) '('('type) '('autoref)) '"dq"))
diff --git a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-HistResponse-default.txt_/opt.yql b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-HistResponse-default.txt_/opt.yql
index 972840ac7b1..6c61ea29c43 100644
--- a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-HistResponse-default.txt_/opt.yql
+++ b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-HistResponse-default.txt_/opt.yql
@@ -4,7 +4,7 @@
(let $3 (DataType 'String))
(let $4 (StructType '('"labels" (DictType $3 $3)) '('"ts" (DataType 'Datetime)) '('type $3) '('"value" (DataType 'Double))))
(let $5 '('"labels" '"value" '"ts" 'type))
-(let $6 (SoSourceSettings world '"hist" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"histogram_percentile(95, {})" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15")))
+(let $6 (SoSourceSettings world '"hist" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"histogram_percentile(95, {})" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15") '()))
(let $7 (DqStage '((DqSource (DataSource '"solomon" '"local_solomon") $6)) (lambda '($10) $10) '('('"_logical_id" '0))))
(let $8 (DqStage '((DqCnUnionAll (TDqOutput $7 '"0"))) (lambda '($11) $11) '('('"_logical_id" '0))))
(let $9 (ResPull! $1 $2 (Key) (DqCnResult (TDqOutput $8 '"0") '()) '('('type) '('autoref)) '"dq"))
diff --git a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-LabelColumns-default.txt_/opt.yql b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-LabelColumns-default.txt_/opt.yql
index 9eb080067b6..ba0fe486fab 100644
--- a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-LabelColumns-default.txt_/opt.yql
+++ b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-LabelColumns-default.txt_/opt.yql
@@ -8,7 +8,7 @@
(let $7 (StructType '($3 $4) '($5 $4) '($6 $4) '('"project" $4) '('"ts" (DataType 'Datetime)) '('type $4) '('"value" (DataType 'Double))))
(let $8 '('"value" '"ts" 'type))
(let $9 '($3 $5 '"project" $6))
-(let $10 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $7 $8 $9 '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15")))
+(let $10 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $7 $8 $9 '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"AVG" '"PREVIOUS" (Uint32 '"15") '()))
(let $11 (DqStage '((DqSource (DataSource '"solomon" '"local_solomon") $10)) (lambda '($14) $14) '('('"_logical_id" '0))))
(let $12 (DqStage '((DqCnUnionAll (TDqOutput $11 '"0"))) (lambda '($15) $15) '('('"_logical_id" '0))))
(let $13 (ResPull! $1 $2 (Key) (DqCnResult (TDqOutput $12 '"0") '()) '('('type) '('autoref)) '"dq"))
diff --git a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Subquery-default.txt_/opt.yql b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Subquery-default.txt_/opt.yql
index 5b038c22f4e..508f856cab9 100644
--- a/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Subquery-default.txt_/opt.yql
+++ b/ydb/library/yql/tests/sql/solomon/canondata/test.test_solomon-Subquery-default.txt_/opt.yql
@@ -4,7 +4,7 @@
(let $3 (DataType 'String))
(let $4 (StructType '('"labels" (DictType $3 $3)) '('"ts" (DataType 'Datetime)) '('type $3) '('"value" (DataType 'Double))))
(let $5 '('"labels" '"value" '"ts" 'type))
-(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"SUM" '"PREVIOUS" (Uint32 '"25")))
+(let $6 (SoSourceSettings world '"my_project" (SecureParam '"cluster:default_local_solomon") $4 $5 '() '"2023-12-08T14:40:39Z" '"2023-12-08T14:45:39Z" '"" '"{}" (Bool '"false") '"SUM" '"PREVIOUS" (Uint32 '"25") '()))
(let $7 (DqStage '((DqSource (DataSource '"solomon" '"local_solomon") $6)) (lambda '($11) $11) '('('"_logical_id" '0))))
(let $8 (DqStage '((DqCnUnionAll (TDqOutput $7 '"0"))) (lambda '($12) $12) '('('"_logical_id" '0))))
(let $9 '('('type) '('autoref) '('unordered)))
diff --git a/ydb/library/yql/tools/solomon_emulator_grpc/__main__.py b/ydb/library/yql/tools/solomon_emulator_grpc/__main__.py
index 0669b8cd424..9ba01f1f6f0 100644
--- a/ydb/library/yql/tools/solomon_emulator_grpc/__main__.py
+++ b/ydb/library/yql/tools/solomon_emulator_grpc/__main__.py
@@ -7,9 +7,9 @@ from library.python.testing.recipe import declare_recipe, set_env
from library.recipes.common import find_free_ports
import grpc
-from ydb.library.yql.providers.solomon.solomon_accessor.grpc.solomon_accessor_pb_pb2_grpc import \
+from ydb.library.yql.providers.solomon.solomon_accessor.grpc.data_service_pb2_grpc import \
DataServiceServicer, add_DataServiceServicer_to_server
-from ydb.library.yql.providers.solomon.solomon_accessor.grpc.solomon_accessor_pb_pb2 import ReadRequest, ReadResponse, MetricType
+from ydb.library.yql.providers.solomon.solomon_accessor.grpc.data_service_pb2 import ReadRequest, ReadResponse, MetricType
PID_FILENAME = "solomon_recipe.pid"