diff options
author | Oleg Doronin <fortan57@gmail.com> | 2022-05-01 02:01:29 +0300 |
---|---|---|
committer | Oleg Doronin <fortan57@gmail.com> | 2022-05-01 02:01:29 +0300 |
commit | 65e4760cd73b3929ab5eb735210facfff8822495 (patch) | |
tree | 1efbe70bcc64c4573fc6fd673204689b06b608ab | |
parent | d227d4f3e8008cea9f135e271a5b7d56db6b5c44 (diff) | |
download | ydb-65e4760cd73b3929ab5eb735210facfff8822495.tar.gz |
test connection YQ-48
test connection for monitoring
os + cleanup
list streams for pq
test connection for yds
ref:d44fd535fb66f103981f959c1d1eeeaceddb4944
35 files changed, 1688 insertions, 387 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 6756a81997..91b10039eb 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -808,13 +808,13 @@ add_subdirectory(ydb/core/yq/libs/health) add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery) add_subdirectory(ydb/core/yq/libs/test_connection) add_subdirectory(ydb/core/yq/libs/test_connection/events) +add_subdirectory(ydb/library/yql/providers/solomon/async_io) +add_subdirectory(library/cpp/json/easy_parse) add_subdirectory(ydb/library/yql/dq/comp_nodes) add_subdirectory(ydb/library/yql/providers/clickhouse/actors) add_subdirectory(ydb/library/yql/providers/pq/async_io) add_subdirectory(ydb/library/yql/providers/pq/gateway/native) add_subdirectory(ydb/library/yql/providers/s3/actors) -add_subdirectory(ydb/library/yql/providers/solomon/async_io) -add_subdirectory(library/cpp/json/easy_parse) add_subdirectory(ydb/library/yql/providers/solomon/gateway) add_subdirectory(ydb/library/yql/providers/solomon/provider) add_subdirectory(ydb/library/yql/providers/solomon/expr_nodes) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 0353dc8745..da3fa70606 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -888,13 +888,13 @@ add_subdirectory(ydb/core/yq/libs/health) add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery) add_subdirectory(ydb/core/yq/libs/test_connection) add_subdirectory(ydb/core/yq/libs/test_connection/events) +add_subdirectory(ydb/library/yql/providers/solomon/async_io) +add_subdirectory(library/cpp/json/easy_parse) add_subdirectory(ydb/library/yql/dq/comp_nodes) add_subdirectory(ydb/library/yql/providers/clickhouse/actors) add_subdirectory(ydb/library/yql/providers/pq/async_io) add_subdirectory(ydb/library/yql/providers/pq/gateway/native) add_subdirectory(ydb/library/yql/providers/s3/actors) -add_subdirectory(ydb/library/yql/providers/solomon/async_io) -add_subdirectory(library/cpp/json/easy_parse) add_subdirectory(ydb/library/yql/providers/solomon/gateway) add_subdirectory(ydb/library/yql/providers/solomon/provider) add_subdirectory(ydb/library/yql/providers/solomon/expr_nodes) diff --git a/ydb/core/yq/libs/actors/clusters_from_connections.cpp b/ydb/core/yq/libs/actors/clusters_from_connections.cpp index 93e0471892..8ddc4096ae 100644 --- a/ydb/core/yq/libs/actors/clusters_from_connections.cpp +++ b/ydb/core/yq/libs/actors/clusters_from_connections.cpp @@ -13,7 +13,9 @@ using namespace NYql; namespace { template <typename TClusterConfig> -void FillClusterAuth(TClusterConfig& clusterCfg, const YandexQuery::IamAuth& auth, const TString& authToken, const THashMap<TString, TString>& accountIdSignatures) { +void FillClusterAuth(TClusterConfig& clusterCfg, + const YandexQuery::IamAuth& auth, const TString& authToken, + const THashMap<TString, TString>& accountIdSignatures) { switch (auth.identity_case()) { case YandexQuery::IamAuth::kNone: break; @@ -30,8 +32,100 @@ void FillClusterAuth(TClusterConfig& clusterCfg, const YandexQuery::IamAuth& aut } } +void FillPqClusterConfig(NYql::TPqClusterConfig& clusterConfig, + const TString& name, bool useBearerForYdb, + const TString& authToken, const THashMap<TString, TString>& accountIdSignatures, + const YandexQuery::DataStreams& ds) { + clusterConfig.SetName(name); + if (ds.endpoint()) { + clusterConfig.SetEndpoint(ds.endpoint()); + } + clusterConfig.SetDatabase(ds.database()); + clusterConfig.SetDatabaseId(ds.database_id()); + clusterConfig.SetUseSsl(ds.secure()); + clusterConfig.SetAddBearerToToken(useBearerForYdb); + clusterConfig.SetClusterType(TPqClusterConfig::CT_DATA_STREAMS); + FillClusterAuth(clusterConfig, ds.auth(), authToken, accountIdSignatures); +} + +void FillS3ClusterConfig(NYql::TS3ClusterConfig& clusterConfig, + const TString& name, const TString& authToken, + const TString& objectStorageEndpoint, + const THashMap<TString, TString>& accountIdSignatures, + const YandexQuery::ObjectStorageConnection& s3) { + clusterConfig.SetName(name); + TString objectStorageUrl; + if (objectStorageEndpoint == "https://s3.mds.yandex.net") { + objectStorageUrl = TStringBuilder() << "https://" << s3.bucket() << ".s3.mds.yandex.net/"; + } else { + objectStorageUrl = TStringBuilder() << objectStorageEndpoint << '/' << s3.bucket() << '/'; + } + clusterConfig.SetUrl(objectStorageUrl); + FillClusterAuth(clusterConfig, s3.auth(), authToken, accountIdSignatures); +} + +void FillSolomonClusterConfig(NYql::TSolomonClusterConfig& clusterConfig, + const TString& name, const TString& authToken, + const THashMap<TString, TString>& accountIdSignatures, + const YandexQuery::Monitoring& monitoring) { + clusterConfig.SetName(name); + + // TODO: move Endpoint to yq config + auto solomonEndpointForTest = GetEnv("SOLOMON_ENDPOINT"); + auto solomonEndpoint = solomonEndpointForTest ? TString(solomonEndpointForTest) : TString(); + if (solomonEndpoint.empty()) { + if (name.StartsWith("pre")) { + solomonEndpoint = "monitoring.api.cloud-preprod.yandex.net"; + clusterConfig.SetUseSsl(true); + } else if (name.StartsWith("so")) { + solomonEndpoint = "solomon.yandex.net"; + } else { + solomonEndpoint = "monitoring.api.cloud.yandex.net"; + clusterConfig.SetUseSsl(true); + } + } + + clusterConfig.SetCluster(solomonEndpoint); + clusterConfig.SetClusterType(TSolomonClusterConfig::SCT_MONITORING); + clusterConfig.MutablePath()->SetProject(monitoring.project()); + clusterConfig.MutablePath()->SetCluster(monitoring.cluster()); + FillClusterAuth(clusterConfig, monitoring.auth(), authToken, accountIdSignatures); +} + } //namespace +NYql::TPqClusterConfig CreatePqClusterConfig(const TString& name, + bool useBearerForYdb, const TString& authToken, + const TString& accountSignature, const YandexQuery::DataStreams& ds) { + NYql::TPqClusterConfig cluster; + THashMap<TString, TString> accountIdSignatures; + if (ds.auth().has_service_account()) { + accountIdSignatures[ds.auth().service_account().id()] = accountSignature; + } + FillPqClusterConfig(cluster, name, useBearerForYdb, authToken, accountIdSignatures, ds); + return cluster; +} + +NYql::TS3ClusterConfig CreateS3ClusterConfig(const TString& name, + const TString& authToken, const TString& objectStorageEndpoint, + const TString& accountSignature, const YandexQuery::ObjectStorageConnection& s3) { + NYql::TS3ClusterConfig cluster; + THashMap<TString, TString> accountIdSignatures; + accountIdSignatures[s3.auth().service_account().id()] = accountSignature; + FillS3ClusterConfig(cluster, name, authToken, objectStorageEndpoint, accountIdSignatures, s3); + return cluster; +} + +NYql::TSolomonClusterConfig CreateSolomonClusterConfig(const TString& name, + const TString& authToken, const TString& accountSignature, + const YandexQuery::Monitoring& monitoring) { + NYql::TSolomonClusterConfig cluster; + THashMap<TString, TString> accountIdSignatures; + accountIdSignatures[monitoring.auth().service_account().id()] = accountSignature; + FillSolomonClusterConfig(cluster, name, authToken, accountIdSignatures, monitoring); + return cluster; +} + void AddClustersFromConnections(const THashMap<TString, YandexQuery::Connection>& connections, bool useBearerForYdb, const TString& objectStorageEndpoint, @@ -73,58 +167,21 @@ void AddClustersFromConnections(const THashMap<TString, YandexQuery::Connection> case YandexQuery::ConnectionSetting::kObjectStorage: { const auto& s3 = conn.content().setting().object_storage(); auto* clusterCfg = gatewaysConfig.MutableS3()->AddClusterMapping(); - clusterCfg->SetName(connectionName); - TString objectStorageUrl; - if (objectStorageEndpoint == "https://s3.mds.yandex.net") { - objectStorageUrl = TStringBuilder() << "https://" << s3.bucket() << ".s3.mds.yandex.net/"; - } else { - objectStorageUrl = TStringBuilder() << objectStorageEndpoint << '/' << s3.bucket() << '/'; - } - clusterCfg->SetUrl(objectStorageUrl); - FillClusterAuth(*clusterCfg, s3.auth(), authToken, accountIdSignatures); + FillS3ClusterConfig(*clusterCfg, connectionName, authToken, objectStorageEndpoint, accountIdSignatures, s3); clusters.emplace(connectionName, S3ProviderName); break; } case YandexQuery::ConnectionSetting::kDataStreams: { const auto& ds = conn.content().setting().data_streams(); auto* clusterCfg = gatewaysConfig.MutablePq()->AddClusterMapping(); - clusterCfg->SetName(connectionName); - if (ds.endpoint()) - clusterCfg->SetEndpoint(ds.endpoint()); - clusterCfg->SetDatabase(ds.database()); - clusterCfg->SetDatabaseId(ds.database_id()); - clusterCfg->SetUseSsl(ds.secure()); - clusterCfg->SetAddBearerToToken(useBearerForYdb); - clusterCfg->SetClusterType(TPqClusterConfig::CT_DATA_STREAMS); - FillClusterAuth(*clusterCfg, ds.auth(), authToken, accountIdSignatures); + FillPqClusterConfig(*clusterCfg, connectionName, useBearerForYdb, authToken, accountIdSignatures, ds); clusters.emplace(connectionName, PqProviderName); break; } case YandexQuery::ConnectionSetting::kMonitoring: { const auto& monitoring = conn.content().setting().monitoring(); auto* clusterCfg = gatewaysConfig.MutableSolomon()->AddClusterMapping(); - clusterCfg->SetName(connectionName); - - // TODO: move Endpoint to yq config - auto solomonEndpointForTest = GetEnv("SOLOMON_ENDPOINT"); - auto solomonEndpoint = solomonEndpointForTest ? TString(solomonEndpointForTest) : TString(); - if (solomonEndpoint.empty()) { - if (connectionName.StartsWith("pre")) { - solomonEndpoint = "monitoring.api.cloud-preprod.yandex.net"; - clusterCfg->SetUseSsl(true); - } else if (connectionName.StartsWith("so")) { - solomonEndpoint = "solomon.yandex.net"; - } else { - solomonEndpoint = "monitoring.api.cloud.yandex.net"; - clusterCfg->SetUseSsl(true); - } - } - - clusterCfg->SetCluster(solomonEndpoint); - clusterCfg->SetClusterType(TSolomonClusterConfig::SCT_MONITORING); - clusterCfg->MutablePath()->SetProject(monitoring.project()); - clusterCfg->MutablePath()->SetCluster(monitoring.cluster()); - FillClusterAuth(*clusterCfg, monitoring.auth(), authToken, accountIdSignatures); + FillSolomonClusterConfig(*clusterCfg, connectionName, authToken, accountIdSignatures, monitoring); clusters.emplace(connectionName, SolomonProviderName); break; } diff --git a/ydb/core/yq/libs/actors/clusters_from_connections.h b/ydb/core/yq/libs/actors/clusters_from_connections.h index 08db9704ea..88c925d74d 100644 --- a/ydb/core/yq/libs/actors/clusters_from_connections.h +++ b/ydb/core/yq/libs/actors/clusters_from_connections.h @@ -5,6 +5,12 @@ namespace NYq { +NYql::TPqClusterConfig CreatePqClusterConfig(const TString& name, bool useBearerForYdb, const TString& authToken, const TString& accountSignature, const YandexQuery::DataStreams& ds); + +NYql::TS3ClusterConfig CreateS3ClusterConfig(const TString& name, const TString& authToken, const TString& objectStorageEndpoint, const TString& accountSignature, const YandexQuery::ObjectStorageConnection& s3); + +NYql::TSolomonClusterConfig CreateSolomonClusterConfig(const TString& name, const TString& authToken, const TString& accountSignature, const YandexQuery::Monitoring& monitoring); + void AddClustersFromConnections(const THashMap<TString, YandexQuery::Connection>& connections, bool useBearerForYdb, const TString& objectStorageEndpoint, diff --git a/ydb/core/yq/libs/actors/logging/log.h b/ydb/core/yq/libs/actors/logging/log.h index 6fa331b986..78270e26ed 100644 --- a/ydb/core/yq/libs/actors/logging/log.h +++ b/ydb/core/yq/libs/actors/logging/log.h @@ -165,6 +165,16 @@ #define LOG_YQ_TEST_CONNECTION_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, YQ_TEST_CONNECTION, logRecordStream) #define LOG_YQ_TEST_CONNECTION_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, YQ_TEST_CONNECTION, logRecordStream) +#define LOG_YQ_TEST_CONNECTION_AS_EMERG(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, EMERG, YQ_TEST_CONNECTION, logRecordStream) +#define LOG_YQ_TEST_CONNECTION_AS_ALERT(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, ALERT, YQ_TEST_CONNECTION, logRecordStream) +#define LOG_YQ_TEST_CONNECTION_AS_CRIT(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, CRIT, YQ_TEST_CONNECTION, logRecordStream) +#define LOG_YQ_TEST_CONNECTION_AS_ERROR(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, ERROR, YQ_TEST_CONNECTION, logRecordStream) +#define LOG_YQ_TEST_CONNECTION_AS_WARN(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, WARN, YQ_TEST_CONNECTION, logRecordStream) +#define LOG_YQ_TEST_CONNECTION_AS_NOTICE(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, NOTICE, YQ_TEST_CONNECTION, logRecordStream) +#define LOG_YQ_TEST_CONNECTION_AS_INFO(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, INFO, YQ_TEST_CONNECTION, logRecordStream) +#define LOG_YQ_TEST_CONNECTION_AS_DEBUG(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, DEBUG, YQ_TEST_CONNECTION, logRecordStream) +#define LOG_YQ_TEST_CONNECTION_AS_TRACE(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, TRACE, YQ_TEST_CONNECTION, logRecordStream) + // Component: YQ_AUDIT. #define LOG_YQ_AUDIT_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, YQ_AUDIT, logRecordStream) #define LOG_YQ_AUDIT_SERVICE_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, YQ_AUDIT, logRecordStream) diff --git a/ydb/core/yq/libs/common/util.h b/ydb/core/yq/libs/common/util.h new file mode 100644 index 0000000000..aec53efd76 --- /dev/null +++ b/ydb/core/yq/libs/common/util.h @@ -0,0 +1,15 @@ +#pragma once + +#include <array> + +namespace NYq { + +template<std::size_t K, typename T, std::size_t N> +auto CreateArray(const T(&list)[N]) -> std::array<T, K> { + static_assert(N == K, "not valid array size"); + std::array<T, K> result; + std::copy(std::begin(list), std::end(list), std::begin(result)); + return result; +} + +} // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt index afa4d67a9a..6db7c58dab 100644 --- a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt +++ b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt @@ -36,10 +36,12 @@ target_link_libraries(yq-libs-control_plane_storage PUBLIC yql-public-issue ) target_sources(yq-libs-control_plane_storage PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/exceptions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/probes.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/request_validators.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/util.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/validators.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp diff --git a/ydb/core/yq/libs/control_plane_storage/config.cpp b/ydb/core/yq/libs/control_plane_storage/config.cpp new file mode 100644 index 0000000000..afebde45a0 --- /dev/null +++ b/ydb/core/yq/libs/control_plane_storage/config.cpp @@ -0,0 +1,56 @@ +#include "config.h" +#include "util.h" + +namespace NYq { + +namespace { + +YandexQuery::ConnectionSetting::ConnectionCase GetConnectionType(const TString& typeStr) { + YandexQuery::ConnectionSetting::ConnectionType type = YandexQuery::ConnectionSetting::CONNECTION_TYPE_UNSPECIFIED; + YandexQuery::ConnectionSetting::ConnectionType_Parse(typeStr, &type); + return static_cast<YandexQuery::ConnectionSetting::ConnectionCase>(type); +} + +YandexQuery::BindingSetting::BindingCase GetBindingType(const TString& typeStr) { + YandexQuery::BindingSetting::BindingType type = YandexQuery::BindingSetting::BINDING_TYPE_UNSPECIFIED; + YandexQuery::BindingSetting::BindingType_Parse(typeStr, &type); + return static_cast<YandexQuery::BindingSetting::BindingCase>(type); +} + +} + +TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common) + : Proto(FillDefaultParameters(config)) + , IdsPrefix(common.GetIdsPrefix()) + , IdempotencyKeyTtl(GetDuration(Proto.GetIdempotencyKeysTtl(), TDuration::Minutes(10))) + , AutomaticQueriesTtl(GetDuration(Proto.GetAutomaticQueriesTtl(), TDuration::Days(1))) + , ResultSetsTtl(GetDuration(Proto.GetResultSetsTtl(), TDuration::Days(1))) + , AnalyticsRetryCounterUpdateTime(GetDuration(Proto.GetAnalyticsRetryCounterUpdateTime(), TDuration::Days(1))) + , StreamingRetryCounterUpdateTime(GetDuration(Proto.GetAnalyticsRetryCounterUpdateTime(), TDuration::Days(1))) + , TaskLeaseTtl(GetDuration(Proto.GetTaskLeaseTtl(), TDuration::Seconds(30))) +{ + for (const auto& availableConnection : Proto.GetAvailableConnection()) { + AvailableConnections.insert(GetConnectionType(availableConnection)); + } + + for (const auto& availableBinding : Proto.GetAvailableBinding()) { + AvailableBindings.insert(GetBindingType(availableBinding)); + } + + for (const auto& mapping : Proto.GetRetryPolicyMapping()) { + auto& retryPolicy = mapping.GetPolicy(); + auto retryCount = retryPolicy.GetRetryCount(); + auto retryPeriod = GetDuration(retryPolicy.GetRetryPeriod(), TDuration::Hours(1)); + auto backoffPeriod = GetDuration(retryPolicy.GetBackoffPeriod(), TDuration::Zero()); + for (const auto statusCode: mapping.GetStatusCode()) { + RetryPolicies.emplace(statusCode, TRetryPolicyItem(retryCount, retryPeriod, backoffPeriod)); + } + } + + if (Proto.HasTaskLeaseRetryPolicy()) { + TaskLeaseRetryPolicy.RetryCount = Proto.GetTaskLeaseRetryPolicy().GetRetryCount(); + TaskLeaseRetryPolicy.RetryPeriod = GetDuration(Proto.GetTaskLeaseRetryPolicy().GetRetryPeriod(), TDuration::Days(1)); + } +} + +} // NYq diff --git a/ydb/core/yq/libs/control_plane_storage/config.h b/ydb/core/yq/libs/control_plane_storage/config.h new file mode 100644 index 0000000000..bd637932ee --- /dev/null +++ b/ydb/core/yq/libs/control_plane_storage/config.h @@ -0,0 +1,32 @@ +#pragma once + +#include "util.h" + +#include <ydb/core/yq/libs/config/protos/common.pb.h> +#include <ydb/core/yq/libs/config/protos/control_plane_storage.pb.h> +#include <ydb/public/api/protos/yq.pb.h> + +#include <util/datetime/base.h> +#include <util/generic/hash.h> +#include <util/generic/set.h> + +namespace NYq { + +struct TControlPlaneStorageConfig { + NConfig::TControlPlaneStorageConfig Proto; + TString IdsPrefix; + TDuration IdempotencyKeyTtl; + TDuration AutomaticQueriesTtl; + TDuration ResultSetsTtl; + TDuration AnalyticsRetryCounterUpdateTime; + TDuration StreamingRetryCounterUpdateTime; + TDuration TaskLeaseTtl; + TSet<YandexQuery::ConnectionSetting::ConnectionCase> AvailableConnections; + TSet<YandexQuery::BindingSetting::BindingCase> AvailableBindings; + THashMap<ui64, TRetryPolicyItem> RetryPolicies; + TRetryPolicyItem TaskLeaseRetryPolicy; + + TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common); +}; + +} // NYq diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp index 74616c9caf..5d33c45bc3 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp @@ -99,7 +99,6 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam TDuration backoff = taskLeaseTtl; if (request->ResignQuery) { - TRetryPolicyItem policy(0, TDuration::Seconds(1), TDuration::Zero()); auto it = retryPolicies.find(request->StatusCode); auto policyFound = it != retryPolicies.end(); diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.cpp b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp new file mode 100644 index 0000000000..861d90e7d9 --- /dev/null +++ b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp @@ -0,0 +1,108 @@ +#include "request_validators.h" + +namespace NYq { + +NYql::TIssues ValidateConnectionSetting(const YandexQuery::ConnectionSetting& setting, const TSet<YandexQuery::ConnectionSetting::ConnectionCase>& availableConnections, bool disableCurrentIam, bool clickHousePasswordRequire) { + NYql::TIssues issues; + if (!availableConnections.contains(setting.connection_case())) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "connection of the specified type is disabled")); + } + + switch (setting.connection_case()) { + case YandexQuery::ConnectionSetting::kYdbDatabase: { + const YandexQuery::YdbDatabase database = setting.ydb_database(); + if (!database.has_auth() || database.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.ydb_database.auth field is not specified")); + } + + if (database.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && disableCurrentIam) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled")); + } + + if (!database.database_id() && !(database.endpoint() && database.database())) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.ydb_database.{database_id or database,endpoint} field is not specified")); + } + break; + } + case YandexQuery::ConnectionSetting::kClickhouseCluster: { + const YandexQuery::ClickHouseCluster ch = setting.clickhouse_cluster(); + if (!ch.has_auth() || ch.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.auth field is not specified")); + } + + if (ch.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && disableCurrentIam) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled")); + } + + if (!ch.database_id() && !(ch.host() && ch.port())) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.{database_id or host,port} field is not specified")); + } + + if (!ch.login()) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.login field is not specified")); + } + + if (!ch.password() && clickHousePasswordRequire) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.password field is not specified")); + } + break; + } + case YandexQuery::ConnectionSetting::kObjectStorage: { + const YandexQuery::ObjectStorageConnection objectStorage = setting.object_storage(); + if (!objectStorage.has_auth() || objectStorage.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.object_storage.auth field is not specified")); + } + + if (objectStorage.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && disableCurrentIam) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled")); + } + + if (!objectStorage.bucket()) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.object_storage.bucket field is not specified")); + } + break; + } + case YandexQuery::ConnectionSetting::kDataStreams: { + const YandexQuery::DataStreams dataStreams = setting.data_streams(); + if (!dataStreams.has_auth() || dataStreams.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.data_streams.auth field is not specified")); + } + + if (dataStreams.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && disableCurrentIam) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled")); + } + + if (!dataStreams.database_id() && !(dataStreams.endpoint() && dataStreams.database())) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.data_streams.{database_id or database,endpoint} field is not specified")); + } + break; + } + case YandexQuery::ConnectionSetting::kMonitoring: { + const YandexQuery::Monitoring monitoring = setting.monitoring(); + if (!monitoring.has_auth() || monitoring.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.monitoring.auth field is not specified")); + } + + if (monitoring.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && disableCurrentIam) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled")); + } + + if (!monitoring.project()) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.monitoring.project field is not specified")); + } + + if (!monitoring.cluster()) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.monitoring.cluster field is not specified")); + } + break; + } + case YandexQuery::ConnectionSetting::CONNECTION_NOT_SET: { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "connection is not set")); + break; + } + // Do not add default. Adding a new connection should cause a compilation error + } + return issues; +} + +} // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.h b/ydb/core/yq/libs/control_plane_storage/request_validators.h new file mode 100644 index 0000000000..0e91b2b03f --- /dev/null +++ b/ydb/core/yq/libs/control_plane_storage/request_validators.h @@ -0,0 +1,152 @@ +#pragma once + +#include <ydb/core/yq/libs/config/yq_issue.h> +#include <ydb/library/yql/public/issue/yql_issue.h> +#include <ydb/public/api/protos/yq.pb.h> + +#include <util/generic/fwd.h> +#include <util/generic/set.h> +#include <util/string/builder.h> +#include <util/string/cast.h> + +namespace NYq { + +template<class P> +NYql::TIssues ValidateEvent(P& ev, size_t maxSize) +{ + const auto& request = ev->Get()->Request; + const TString& scope = ev->Get()->Scope; + const TString& user = ev->Get()->User; + const TString& token = ev->Get()->Token; + const size_t byteSize = request.ByteSizeLong(); + + NYql::TIssues issues; + if (!scope) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "scope is not specified")); + } + + if (!user) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "user is empty")); + } + + if (!token) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::ACCESS_DENIED, "token is empty")); + } + + if (byteSize > maxSize) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, TStringBuilder{} << "request size exceeded " << maxSize << " bytes. Request size: " << byteSize)); + } + + TString error; + if (!request.validate(error)) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, error)); + } + + return issues; +} + +template<typename T> +NYql::TIssues ValidateQuery(T& ev, size_t maxSize) +{ + NYql::TIssues issues = ValidateEvent(ev, maxSize); + auto& request = ev->Get()->Request; + const auto& content = request.content(); + + if (request.execute_mode() == YandexQuery::ExecuteMode::EXECUTE_MODE_UNSPECIFIED) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "execute_mode field is not specified")); + } + + if (content.type() == YandexQuery::QueryContent::QUERY_TYPE_UNSPECIFIED) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "type field is not specified")); + } + + if (content.acl().visibility() == YandexQuery::Acl::VISIBILITY_UNSPECIFIED) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "acl.visibility field is not specified")); + } + + if (content.type() == YandexQuery::QueryContent::STREAMING && !request.has_disposition()) { + request.mutable_disposition()->mutable_fresh(); + } + + return issues; +} + +template<typename T> + NYql::TIssues ValidateBinding(T& ev, size_t maxSize, const TSet<YandexQuery::BindingSetting::BindingCase>& availableBindings) +{ + const auto& request = ev->Get()->Request; + NYql::TIssues issues = ValidateEvent(ev, maxSize); + + if (request.has_content()) { + const YandexQuery::BindingContent& content = request.content(); + if (content.acl().visibility() == YandexQuery::Acl::VISIBILITY_UNSPECIFIED) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding.acl.visibility field is not specified")); + } + + if (content.name() != to_lower(content.name())) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, TStringBuilder{} << "Incorrect binding name: " << content.name() << ". Please use only lower case")); + } + + if (!content.has_setting()) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding.setting field is not specified")); + } + + const YandexQuery::BindingSetting& setting = content.setting(); + if (!availableBindings.contains(setting.binding_case())) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding of the specified type is disabled")); + } + + switch (setting.binding_case()) { + case YandexQuery::BindingSetting::kDataStreams: { + const YandexQuery::DataStreamsBinding dataStreams = setting.data_streams(); + if (!dataStreams.has_schema()) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "data streams with empty schema is forbidden")); + } + break; + } + case YandexQuery::BindingSetting::BINDING_NOT_SET: { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding is not set")); + break; + } + // Do not replace with default. Adding a new binding should cause a compilation error + case YandexQuery::BindingSetting::kObjectStorage: + break; + } + } else { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content field is not specified")); + } + + return issues; +} + +NYql::TIssues ValidateConnectionSetting(const YandexQuery::ConnectionSetting& setting, const TSet<YandexQuery::ConnectionSetting::ConnectionCase>& availableConnections, bool disableCurrentIam, bool clickHousePasswordRequire = true); + +template<typename T> +NYql::TIssues ValidateConnection(T& ev, size_t maxSize, const TSet<YandexQuery::ConnectionSetting::ConnectionCase>& availableConnections, bool disableCurrentIam, bool clickHousePasswordRequire = true) +{ + const auto& request = ev->Get()->Request; + NYql::TIssues issues = ValidateEvent(ev, maxSize); + + if (!request.has_content()) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content field is not specified")); + } + + const YandexQuery::ConnectionContent& content = request.content(); + if (content.acl().visibility() == YandexQuery::Acl::VISIBILITY_UNSPECIFIED) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.acl.visibility field is not specified")); + } + + if (content.name() != to_lower(content.name())) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, TStringBuilder{} << "Incorrect connection name: " << content.name() << ". Please use only lower case")); + } + + if (!content.has_setting()) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting field is not specified")); + } + + const YandexQuery::ConnectionSetting& setting = content.setting(); + issues.AddIssues(ValidateConnectionSetting(setting, availableConnections, disableCurrentIam, clickHousePasswordRequire)); + return issues; +} + +} // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_storage/util.h b/ydb/core/yq/libs/control_plane_storage/util.h index 27ada7b03a..9cf5dfe656 100644 --- a/ydb/core/yq/libs/control_plane_storage/util.h +++ b/ydb/core/yq/libs/control_plane_storage/util.h @@ -1,12 +1,12 @@ #pragma once +#include <ydb/core/yq/libs/common/util.h> +#include <ydb/core/yq/libs/config/protos/control_plane_storage.pb.h> #include <ydb/core/yq/libs/control_plane_storage/events/events.h> -#include <util/datetime/base.h> - -#include <contrib/libs/protobuf/src/google/protobuf/timestamp.pb.h> +#include <google/protobuf/timestamp.pb.h> -#include <ydb/core/yq/libs/config/protos/control_plane_storage.pb.h> +#include <util/datetime/base.h> namespace NYq { @@ -38,14 +38,6 @@ TDuration GetDuration(const TString& value, const TDuration& defaultValue); NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlaneStorageConfig config); -template<std::size_t K, typename T, std::size_t N> -auto CreateArray(const T(&list)[N]) -> std::array<T, K> { - static_assert(N == K, "not valid array size"); - std::array<T, K> result; - std::copy(std::begin(list), std::end(list), std::begin(result)); - return result; -} - bool DoesPingTaskUpdateQueriesTable(const TEvControlPlaneStorage::TEvPingTaskRequest* request); NYdb::TValue PackItemsToList(const TVector<NYdb::TValue>& items); diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp index acc2fc3543..c1983795b3 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -21,18 +21,6 @@ void CollectDebugInfo(const TString& query, const TParams& params, TSession sess } } -inline YandexQuery::ConnectionSetting::ConnectionCase GetConnectionType(const TString& typeStr) { - YandexQuery::ConnectionSetting::ConnectionType type = YandexQuery::ConnectionSetting::CONNECTION_TYPE_UNSPECIFIED; - YandexQuery::ConnectionSetting::ConnectionType_Parse(typeStr, &type); - return static_cast<YandexQuery::ConnectionSetting::ConnectionCase>(type); -} - -inline YandexQuery::BindingSetting::BindingCase GetBindingType(const TString& typeStr) { - YandexQuery::BindingSetting::BindingType type = YandexQuery::BindingSetting::BINDING_TYPE_UNSPECIFIED; - YandexQuery::BindingSetting::BindingType_Parse(typeStr, &type); - return static_cast<YandexQuery::BindingSetting::BindingCase>(type); -} - ERetryErrorClass RetryFunc(const NYdb::TStatus& status) { return status.GetStatus() == NYdb::EStatus::OVERLOADED ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry; } @@ -61,41 +49,6 @@ void TYdbControlPlaneStorageActor::Bootstrap() { Become(&TThis::StateFunc); } -TYdbControlPlaneStorageActor::TConfig::TConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common) - : Proto(FillDefaultParameters(config)) - , IdsPrefix(common.GetIdsPrefix()) - , IdempotencyKeyTtl(GetDuration(Proto.GetIdempotencyKeysTtl(), TDuration::Minutes(10))) - , AutomaticQueriesTtl(GetDuration(Proto.GetAutomaticQueriesTtl(), TDuration::Days(1))) - , ResultSetsTtl(GetDuration(Proto.GetResultSetsTtl(), TDuration::Days(1))) - , TaskLeaseTtl(GetDuration(Proto.GetTaskLeaseTtl(), TDuration::Seconds(30))) -{ - for (const auto& availableConnection: Proto.GetAvailableConnection()) { - AvailableConnections.insert(GetConnectionType(availableConnection)); - } - - for (const auto& availableBinding: Proto.GetAvailableBinding()) { - AvailableBindings.insert(GetBindingType(availableBinding)); - } - - for (const auto& mapping: Proto.GetRetryPolicyMapping()) { - auto& retryPolicy = mapping.GetPolicy(); - auto retryCount = retryPolicy.GetRetryCount(); - auto retryPeriod = GetDuration(retryPolicy.GetRetryPeriod(), TDuration::Hours(1)); - auto backoffPeriod = GetDuration(retryPolicy.GetBackoffPeriod(), TDuration::Zero()); - for (const auto statusCode: mapping.GetStatusCode()) { - RetryPolicies.emplace(statusCode, TRetryPolicyItem(retryCount, retryPeriod, backoffPeriod)); - } - } - - if (Proto.HasTaskLeaseRetryPolicy()) { - TaskLeaseRetryPolicy.RetryCount = Proto.GetTaskLeaseRetryPolicy().GetRetryCount(); - TaskLeaseRetryPolicy.RetryPeriod = GetDuration(Proto.GetTaskLeaseRetryPolicy().GetRetryPeriod(), TDuration::Days(1)); - } else { - TaskLeaseRetryPolicy.RetryCount = 20; - TaskLeaseRetryPolicy.RetryPeriod = TDuration::Days(1); - } -} - /* * Creating tables */ diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h index 55e88913bb..ac2c03c73e 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h @@ -1,13 +1,15 @@ #pragma once +#include "config.h" #include "control_plane_storage.h" #include "control_plane_storage_counters.h" #include "exceptions.h" #include "extractors.h" -#include <ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h> #include "probes.h" +#include "request_validators.h" #include "util.h" #include "validators.h" +#include <ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h> #include <util/generic/guid.h> #include <util/system/yassert.h> @@ -217,25 +219,9 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont } }; -private: - struct TConfig { - NConfig::TControlPlaneStorageConfig Proto; - TString IdsPrefix; - TDuration IdempotencyKeyTtl; - TDuration AutomaticQueriesTtl; - TDuration ResultSetsTtl; - TDuration TaskLeaseTtl; - TSet<YandexQuery::ConnectionSetting::ConnectionCase> AvailableConnections; - TSet<YandexQuery::BindingSetting::BindingCase> AvailableBindings; - THashMap<ui64, TRetryPolicyItem> RetryPolicies; - TRetryPolicyItem TaskLeaseRetryPolicy; - - TConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common); - }; - TCounters Counters; - TConfig Config; + ::NYq::TControlPlaneStorageConfig Config; TYdbConnectionPtr YdbConnection; @@ -327,174 +313,15 @@ public: template<typename T> NYql::TIssues ValidateConnection(T& ev, bool clickHousePasswordRequire = true) { - const auto& request = ev->Get()->Request; - NYql::TIssues issues = ValidateEvent(ev); - - if (!request.has_content()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content field is not specified")); - } - - const YandexQuery::ConnectionContent& content = request.content(); - if (content.acl().visibility() == YandexQuery::Acl::VISIBILITY_UNSPECIFIED) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.acl.visibility field is not specified")); - } - - if (content.name() != to_lower(content.name())) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Incorrect connection name: " + content.name() + ". Please use only lower case")); - } - - if (!content.has_setting()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting field is not specified")); - } - - const YandexQuery::ConnectionSetting& setting = content.setting(); - if (!Config.AvailableConnections.contains(setting.connection_case())) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "connection of the specified type is disabled")); - } - - switch (setting.connection_case()) { - case YandexQuery::ConnectionSetting::kYdbDatabase: { - const YandexQuery::YdbDatabase database = setting.ydb_database(); - if (!database.has_auth() || database.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.ydb_database.auth field is not specified")); - } - - if (database.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && Config.Proto.GetDisableCurrentIam()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled")); - } - - if (!database.database_id() && !(database.endpoint() && database.database())) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.ydb_database.{database_id||database,endpoint} field is not specified")); - } - break; - } - case YandexQuery::ConnectionSetting::kClickhouseCluster: { - const YandexQuery::ClickHouseCluster ch = setting.clickhouse_cluster(); - if (!ch.has_auth() || ch.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.auth field is not specified")); - } - - if (ch.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && Config.Proto.GetDisableCurrentIam()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled")); - } - - if (!ch.database_id() && !(ch.host() && ch.port())) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.{database_id||host,port} field is not specified")); - } - - if (!ch.login()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.login field is not specified")); - } - - if (!ch.password() && clickHousePasswordRequire) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.password field is not specified")); - } - break; - } - case YandexQuery::ConnectionSetting::kObjectStorage: { - const YandexQuery::ObjectStorageConnection objectStorage = setting.object_storage(); - if (!objectStorage.has_auth() || objectStorage.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.object_storage.auth field is not specified")); - } - - if (objectStorage.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && Config.Proto.GetDisableCurrentIam()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled")); - } - - if (!objectStorage.bucket()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.object_storage.bucket field is not specified")); - } - break; - } - case YandexQuery::ConnectionSetting::kDataStreams: { - const YandexQuery::DataStreams dataStreams = setting.data_streams(); - if (!dataStreams.has_auth() || dataStreams.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.data_streams.auth field is not specified")); - } - - if (dataStreams.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && Config.Proto.GetDisableCurrentIam()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled")); - } - - if (!dataStreams.database_id() && !(dataStreams.endpoint() && dataStreams.database())) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.data_streams.{database_id||database,endpoint} field is not specified")); - } - break; - } - case YandexQuery::ConnectionSetting::kMonitoring: { - const YandexQuery::Monitoring monitoring = setting.monitoring(); - if (!monitoring.has_auth() || monitoring.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.monitoring.auth field is not specified")); - } - - if (monitoring.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && Config.Proto.GetDisableCurrentIam()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled")); - } - - if (!monitoring.project()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.monitoring.project field is not specified")); - } - - if (!monitoring.cluster()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.monitoring.cluster field is not specified")); - } - break; - } - case YandexQuery::ConnectionSetting::CONNECTION_NOT_SET: { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "connection is not set")); - break; - } - // Do not add default. Adding a new connection should cause a compilation error - } - return issues; + return ::NYq::ValidateConnection<T>(ev, Config.Proto.GetMaxRequestSize(), + Config.AvailableConnections, Config.Proto.GetDisableCurrentIam(), + clickHousePasswordRequire); } template<typename T> NYql::TIssues ValidateBinding(T& ev) { - const auto& request = ev->Get()->Request; - NYql::TIssues issues = ValidateEvent(ev); - - if (request.has_content()) { - const YandexQuery::BindingContent& content = request.content(); - if (content.acl().visibility() == YandexQuery::Acl::VISIBILITY_UNSPECIFIED) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding.acl.visibility field is not specified")); - } - - if (content.name() != to_lower(content.name())) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Incorrect binding name: " + content.name() + ". Please use only lower case")); - } - - if (!content.has_setting()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding.setting field is not specified")); - } - - const YandexQuery::BindingSetting& setting = content.setting(); - if (!Config.AvailableBindings.contains(setting.binding_case())) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding of the specified type is disabled")); - } - - switch (setting.binding_case()) { - case YandexQuery::BindingSetting::kDataStreams: { - const YandexQuery::DataStreamsBinding dataStreams = setting.data_streams(); - if (!dataStreams.has_schema()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "data streams with empty schema is forbidden")); - } - break; - } - case YandexQuery::BindingSetting::BINDING_NOT_SET: { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding is not set")); - break; - } - // Do not replace with default. Adding a new binding should cause a compilation error - case YandexQuery::BindingSetting::kObjectStorage: - break; - } - } else { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding field is not specified")); - } - - return issues; + return ::NYq::ValidateBinding<T>(ev, Config.Proto.GetMaxRequestSize(), Config.AvailableBindings); } void Handle(NMon::TEvHttpInfo::TPtr& ev) { @@ -512,61 +339,13 @@ public: template<typename T> NYql::TIssues ValidateQuery(T& ev) { - NYql::TIssues issues = ValidateEvent(ev); - auto& request = ev->Get()->Request; - const auto& content = request.content(); - - if (request.execute_mode() == YandexQuery::ExecuteMode::EXECUTE_MODE_UNSPECIFIED) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "mode field is not specified")); - } - - if (content.type() == YandexQuery::QueryContent::QUERY_TYPE_UNSPECIFIED) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "type field is not specified")); - } - - if (content.acl().visibility() == YandexQuery::Acl::VISIBILITY_UNSPECIFIED) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "acl.visibility field is not specified")); - } - - if (content.type() == YandexQuery::QueryContent::STREAMING && !request.has_disposition()) { - request.mutable_disposition()->mutable_fresh(); - } - - return issues; + return ::NYq::ValidateQuery<T>(ev, Config.Proto.GetMaxRequestSize()); } template<class P> NYql::TIssues ValidateEvent(P& ev) { - const auto& request = ev->Get()->Request; - const TString scope = ev->Get()->Scope; - const TString user = ev->Get()->User; - const TString token = ev->Get()->Token; - const int byteSize = request.ByteSize(); - - NYql::TIssues issues; - if (!scope) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "scope is not specified")); - } - - if (!user) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "user is empty")); - } - - if (!token) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "token is empty")); - } - - if (byteSize > static_cast<int>(Config.Proto.GetMaxRequestSize())) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "request size exceeded " + ToString(request.ByteSize()) + " out of " + ToString(Config.Proto.GetMaxRequestSize()) + " bytes")); - } - - TString error; - if (!request.validate(error)) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, error)); - } - - return issues; + return ::NYq::ValidateEvent<P>(ev, Config.Proto.GetMaxRequestSize()); } /* diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 7946e71e8c..348a474997 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -75,13 +75,6 @@ void Init( actorRegistrator(NYq::ControlPlaneStorageServiceActorId(), controlPlaneStorage); } - if (protoConfig.GetTestConnection().GetEnabled()) { - auto testConnection = NYq::CreateTestConnectionActor( - protoConfig.GetTestConnection(), - appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "TestConnection")); - actorRegistrator(NYq::TestConnectionActorId(), testConnection); - } - if (protoConfig.GetControlPlaneProxy().GetEnabled()) { auto controlPlaneProxy = NYq::CreateControlPlaneProxyActor(protoConfig.GetControlPlaneProxy(), appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "ControlPlaneProxy")); @@ -135,6 +128,9 @@ void Init( } credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent); + } + + if (protoConfig.GetPrivateApi().GetEnabled()) { RegisterDqPqReadActorFactory(*sourceActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); RegisterYdbReadActorFactory(*sourceActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); RegisterS3ReadActorFactory(*sourceActorFactory, credentialsFactory, @@ -205,6 +201,21 @@ void Init( auto httpProxy = NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance()); actorRegistrator(MakeYqlAnalyticsHttpProxyId(), httpProxy); + if (protoConfig.GetTestConnection().GetEnabled()) { + auto testConnection = NYq::CreateTestConnectionActor( + protoConfig.GetTestConnection(), + protoConfig.GetControlPlaneStorage(), + protoConfig.GetCommon(), + protoConfig.GetTokenAccessor(), + yqSharedResources, + credentialsFactory, + pqCmConnections, + appData->FunctionRegistry, + httpGateway, + appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "TestConnection")); + actorRegistrator(NYq::TestConnectionActorId(), testConnection); + } + if (protoConfig.GetPendingFetcher().GetEnabled()) { auto fetcher = CreatePendingFetcher( yqSharedResources, diff --git a/ydb/core/yq/libs/test_connection/CMakeLists.txt b/ydb/core/yq/libs/test_connection/CMakeLists.txt index 6c4b4aef64..adf2dec96f 100644 --- a/ydb/core/yq/libs/test_connection/CMakeLists.txt +++ b/ydb/core/yq/libs/test_connection/CMakeLists.txt @@ -15,11 +15,18 @@ target_link_libraries(yq-libs-test_connection PUBLIC contrib-libs-cxxsupp yutil library-cpp-lwtrace + cpp-xml-document libs-actors-logging libs-config-protos + yq-libs-control_plane_storage libs-test_connection-events + providers-solomon-async_io ) target_sources(yq-libs-test_connection PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/test_connection.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/probes.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/test_connection.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/test_data_streams.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/test_monitoring.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/test_object_storage.cpp ) diff --git a/ydb/core/yq/libs/test_connection/counters.cpp b/ydb/core/yq/libs/test_connection/counters.cpp new file mode 100644 index 0000000000..92bd0d77bd --- /dev/null +++ b/ydb/core/yq/libs/test_connection/counters.cpp @@ -0,0 +1,21 @@ +#include "counters.h" + +namespace NYq { + +TTestConnectionRequestCounters::TTestConnectionRequestCounters(const TString& name) + : Name(name) +{} + +void TTestConnectionRequestCounters::Register(const NMonitoring::TDynamicCounterPtr& counters) { + auto requestCounters = counters->GetSubgroup("request", Name); + InFly = requestCounters->GetCounter("InFly", false); + Ok = requestCounters->GetCounter("Ok", true); + Error = requestCounters->GetCounter("Error", true); + LatencyMs = requestCounters->GetHistogram("LatencyMs", GetLatencyHistogramBuckets()); +} + +NMonitoring::IHistogramCollectorPtr TTestConnectionRequestCounters::GetLatencyHistogramBuckets() { + return NMonitoring::ExplicitHistogram({0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000}); +} + +} // NYq diff --git a/ydb/core/yq/libs/test_connection/counters.h b/ydb/core/yq/libs/test_connection/counters.h new file mode 100644 index 0000000000..9d0dc4b38b --- /dev/null +++ b/ydb/core/yq/libs/test_connection/counters.h @@ -0,0 +1,25 @@ +#pragma once + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +namespace NYq { + +class TTestConnectionRequestCounters: public virtual TThrRefBase { +public: + const TString Name; + NMonitoring::TDynamicCounters::TCounterPtr InFly; + NMonitoring::TDynamicCounters::TCounterPtr Ok; + NMonitoring::TDynamicCounters::TCounterPtr Error; + NMonitoring::THistogramPtr LatencyMs; + + explicit TTestConnectionRequestCounters(const TString& name); + + void Register(const NMonitoring::TDynamicCounterPtr& counters); + +private: + static NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets(); +}; + +using TTestConnectionRequestCountersPtr = TIntrusivePtr<TTestConnectionRequestCounters>; + +} // NYq diff --git a/ydb/core/yq/libs/test_connection/probes.h b/ydb/core/yq/libs/test_connection/probes.h index b1f1d51abc..133b18160d 100644 --- a/ydb/core/yq/libs/test_connection/probes.h +++ b/ydb/core/yq/libs/test_connection/probes.h @@ -3,10 +3,22 @@ #include <library/cpp/lwtrace/all.h> #define YQ_TEST_CONNECTION_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \ - PROBE(TestConnectionRequest, \ + PROBE(TestDataStreamsConnectionRequest, \ GROUPS(), \ - TYPES(TString, TString, TDuration, i64, bool, bool), \ - NAMES("scope", "user", "latencyMs", "size", "success", "timeout")) \ + TYPES(TString, TString, TDuration, bool), \ + NAMES("scope", "user", "latencyMs", "success")) \ + PROBE(TestMonitoringConnectionRequest, \ + GROUPS(), \ + TYPES(TString, TString, TDuration, bool), \ + NAMES("scope", "user", "latencyMs", "success")) \ + PROBE(TestObjectStorageConnectionRequest, \ + GROUPS(), \ + TYPES(TString, TString, TDuration, bool), \ + NAMES("scope", "user", "latencyMs", "success")) \ + PROBE(TestUnsupportedConnectionRequest, \ + GROUPS(), \ + TYPES(TString, TString), \ + NAMES("scope", "user")) \ // YQ_TEST_CONNECTION_PROVIDER diff --git a/ydb/core/yq/libs/test_connection/request_validators.h b/ydb/core/yq/libs/test_connection/request_validators.h new file mode 100644 index 0000000000..ec52cbb2cf --- /dev/null +++ b/ydb/core/yq/libs/test_connection/request_validators.h @@ -0,0 +1,22 @@ +#pragma once + +#include <ydb/core/yq/libs/control_plane_storage/request_validators.h> + +namespace NYq { + +template<typename T> +NYql::TIssues ValidateTestConnection(T& ev, size_t maxSize, const TSet<YandexQuery::ConnectionSetting::ConnectionCase>& availableConnections, bool disableCurrentIam, bool clickHousePasswordRequire = true) +{ + const auto& request = ev->Get()->Request; + NYql::TIssues issues = ValidateEvent(ev, maxSize); + + if (!request.has_setting()) { + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting field is not specified")); + } + + const YandexQuery::ConnectionSetting& setting = request.setting(); + issues.AddIssues(ValidateConnectionSetting(setting, availableConnections, disableCurrentIam, clickHousePasswordRequire)); + return issues; +} + +} // namespace NYq diff --git a/ydb/core/yq/libs/test_connection/test_connection.cpp b/ydb/core/yq/libs/test_connection/test_connection.cpp index b7148d0dbc..318dd6552f 100644 --- a/ydb/core/yq/libs/test_connection/test_connection.cpp +++ b/ydb/core/yq/libs/test_connection/test_connection.cpp @@ -1,27 +1,137 @@ #include "events/events.h" #include "probes.h" #include "test_connection.h" +#include "request_validators.h" + +#include <ydb/core/mon/mon.h> +#include <ydb/core/yq/libs/actors/database_resolver.h> +#include <ydb/core/yq/libs/actors/proxy.h> +#include <ydb/core/yq/libs/common/util.h> +#include <ydb/core/yq/libs/config/yq_issue.h> +#include <ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h> +#include <ydb/core/yq/libs/control_plane_storage/config.h> + +#include <ydb/library/security/util.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/lwtrace/mon/mon_lwtrace.h> #include <library/cpp/monlib/service/pages/templates.h> -#include <ydb/core/mon/mon.h> -#include <ydb/core/yq/libs/config/yq_issue.h> - namespace NYq { +LWTRACE_USING(YQ_TEST_CONNECTION_PROVIDER); + using namespace NActors; class TTestConnectionActor : public NActors::TActorBootstrapped<TTestConnectionActor> { - NMonitoring::TDynamicCounterPtr Counters; + + enum ERequestTypeScope { + RTS_TEST_DATA_STREAMS_CONNECTION, + RTS_TEST_MONITORING_CONNECTION, + RTS_TEST_OBJECT_STORAGE_CONNECTION, + RTS_TEST_UNSUPPORTED_CONNECTION, + RTS_MAX, + }; + + class TCounters: public virtual TThrRefBase { + struct TMetricsScope { + TString CloudId; + TString Scope; + + bool operator<(const TMetricsScope& right) const { + return std::tie(CloudId, Scope) < std::tie(right.CloudId, right.Scope); + } + }; + + using TScopeCounters = std::array<TTestConnectionRequestCountersPtr, RTS_MAX>; + using TScopeCountersPtr = std::shared_ptr<TScopeCounters>; + + TMap<TMetricsScope, TScopeCountersPtr> ScopeCounters; + NMonitoring::TDynamicCounterPtr Counters; + + ERequestTypeScope ToType(YandexQuery::ConnectionSetting::ConnectionCase connectionCase) { + switch (connectionCase) { + case YandexQuery::ConnectionSetting::kDataStreams: + return RTS_TEST_DATA_STREAMS_CONNECTION; + case YandexQuery::ConnectionSetting::kObjectStorage: + return RTS_TEST_OBJECT_STORAGE_CONNECTION; + case YandexQuery::ConnectionSetting::kMonitoring: + return RTS_TEST_MONITORING_CONNECTION; + default: + return RTS_TEST_UNSUPPORTED_CONNECTION; + } + } + + public: + explicit TCounters(const NMonitoring::TDynamicCounterPtr& counters) + : Counters(counters) + {} + + TTestConnectionRequestCountersPtr GetScopeCounters(const TString& cloudId, const TString& scope, YandexQuery::ConnectionSetting::ConnectionCase connectionCase) { + ERequestTypeScope type = ToType(connectionCase); + TMetricsScope key{cloudId, scope}; + auto it = ScopeCounters.find(key); + if (it != ScopeCounters.end()) { + return (*it->second)[type]; + } + + auto scopeRequests = std::make_shared<TScopeCounters>(CreateArray<RTS_MAX, TTestConnectionRequestCountersPtr>({ + { MakeIntrusive<TTestConnectionRequestCounters>("TestDataStreamsConnection") }, + { MakeIntrusive<TTestConnectionRequestCounters>("TestMonitoringConnection") }, + { MakeIntrusive<TTestConnectionRequestCounters>("TestObjectStorageConnection") }, + { MakeIntrusive<TTestConnectionRequestCounters>("TestUnsupportedConnection") }, + })); + + auto scopeCounters = (cloudId ? Counters->GetSubgroup("cloud_id", cloudId) : Counters) + ->GetSubgroup("scope", scope); + + for (auto& request: *scopeRequests) { + request->Register(scopeCounters); + } + + ScopeCounters[key] = scopeRequests; + return (*scopeRequests)[type]; + } + }; + NConfig::TTestConnectionConfig Config; + ::NYq::TControlPlaneStorageConfig ControlPlaneStorageConfig; + NConfig::TCommonConfig CommonConfig; + NYq::TYqSharedResources::TPtr SharedResouces; + NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; + NPq::NConfigurationManager::IConnections::TPtr CmConnections; + const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; + TCounters Counters; + NYq::TSigner::TPtr Signer; + TActorId DatabaseResolverActor; + std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver; + NYql::IHTTPGateway::TPtr HttpGateway; public: - TTestConnectionActor(const NConfig::TTestConnectionConfig& config, const NMonitoring::TDynamicCounterPtr& counters) - : Counters(counters) - , Config(config) + TTestConnectionActor( + const NConfig::TTestConnectionConfig& config, + const NConfig::TControlPlaneStorageConfig& controlPlaneStorageConfig, + const NConfig::TCommonConfig& commonConfig, + const NConfig::TTokenAccessorConfig& tokenAccessorConfig, + const NYq::TYqSharedResources::TPtr& sharedResources, + const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, + const NPq::NConfigurationManager::IConnections::TPtr& cmConnections, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + const NYql::IHTTPGateway::TPtr& httpGateway, + const NMonitoring::TDynamicCounterPtr& counters) + : Config(config) + , ControlPlaneStorageConfig(controlPlaneStorageConfig, commonConfig) + , CommonConfig(commonConfig) + , SharedResouces(sharedResources) + , CredentialsFactory(credentialsFactory) + , CmConnections(cmConnections) + , FunctionRegistry(functionRegistry) + , Counters(counters) + , HttpGateway(httpGateway) { + if (tokenAccessorConfig.GetHmacSecretFile()) { + Signer = ::NYq::CreateSignerFromFile(tokenAccessorConfig.GetHmacSecretFile()); + } } static constexpr char ActorName[] = "YQ_TEST_CONNECTION"; @@ -31,6 +141,12 @@ public: NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YQ_TEST_CONNECTION_PROVIDER)); + DatabaseResolverActor = Register(NYq::CreateDatabaseResolver(NYq::MakeYqlAnalyticsHttpProxyId(), CredentialsFactory)); + DbResolver = std::make_shared<NYq::TDatabaseAsyncResolverImpl>( + NActors::TActivationContext::ActorSystem(), DatabaseResolverActor, + CommonConfig.GetYdbMvpCloudEndpoint(), CommonConfig.GetMdbGateway(), + CommonConfig.GetMdbTransformHost()); + Become(&TTestConnectionActor::StateFunc); } @@ -40,9 +156,56 @@ public: ) void Handle(TEvTestConnection::TEvTestConnectionRequest::TPtr& ev) { + NYql::TIssues issues = ValidateTestConnection(ev, ControlPlaneStorageConfig.Proto.GetMaxRequestSize(), ControlPlaneStorageConfig.AvailableConnections, ControlPlaneStorageConfig.Proto.GetDisableCurrentIam()); + const TString& cloudId = ev->Get()->CloudId; + const TString& scope = ev->Get()->Scope; + const TString& user = ev->Get()->User; + const TString& token = ev->Get()->Token; YandexQuery::TestConnectionRequest request = std::move(ev->Get()->Request); - TC_LOG_T("TestConnectionRequest: " << request.DebugString()); - Send(ev->Sender, new TEvTestConnection::TEvTestConnectionResponse(NYql::TIssues{MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Unimplemented yet")}), 0, ev->Cookie); + TTestConnectionRequestCountersPtr requestCounters = Counters.GetScopeCounters(cloudId, scope, request.setting().connection_case()); + if (issues) { + requestCounters->Error->Inc(); + TC_LOG_D("TestConnectionRequest: validation failed " << scope << " " << user << " " << NKikimr::MaskTicket(token) << issues.ToOneLineString()); + Send(ev->Sender, new TEvTestConnection::TEvTestConnectionResponse(issues), 0, ev->Cookie); + return; + } + + TC_LOG_T("TestConnectionRequest: " << scope << " " << user << " " << NKikimr::MaskTicket(token) << request.DebugString()); + switch (request.setting().connection_case()) { + case YandexQuery::ConnectionSetting::kDataStreams: { + Register(CreateTestDataStreamsConnectionActor( + *request.mutable_setting()->mutable_data_streams(), + CommonConfig, DbResolver, ev->Sender, + ev->Cookie, SharedResouces, + CredentialsFactory, CmConnections, FunctionRegistry, + scope, user, token, + Signer, requestCounters)); + break; + } + case YandexQuery::ConnectionSetting::kObjectStorage: { + Register(CreateTestObjectStorageConnectionActor( + *request.mutable_setting()->mutable_object_storage(), + CommonConfig, ev->Sender, + ev->Cookie, CredentialsFactory, + HttpGateway, scope, user, token, + Signer, requestCounters)); + break; + } + case YandexQuery::ConnectionSetting::kMonitoring: { + Register(CreateTestMonitoringConnectionActor( + *request.mutable_setting()->mutable_monitoring(), + ev->Sender, ev->Cookie, CredentialsFactory, + scope, user, token, + Signer, requestCounters)); + break; + } + default: { + LWPROBE(TestUnsupportedConnectionRequest, scope, user); + requestCounters->Error->Inc(); + TC_LOG_E("TestConnectionRequest: unimplemented " << scope << " " << user << " " << NKikimr::MaskTicket(token) << request.DebugString()); + Send(ev->Sender, new TEvTestConnection::TEvTestConnectionResponse(NYql::TIssues{MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Unimplemented yet")}), 0, ev->Cookie); + } + } } void Handle(NMon::TEvHttpInfo::TPtr& ev) { @@ -63,8 +226,21 @@ NActors::TActorId TestConnectionActorId() { return NActors::TActorId(0, name); } -NActors::IActor* CreateTestConnectionActor(const NConfig::TTestConnectionConfig& config, const NMonitoring::TDynamicCounterPtr& counters) { - return new TTestConnectionActor(config, counters); +NActors::IActor* CreateTestConnectionActor( + const NConfig::TTestConnectionConfig& config, + const NConfig::TControlPlaneStorageConfig& controlPlaneStorageConfig, + const NConfig::TCommonConfig& commonConfig, + const NConfig::TTokenAccessorConfig& tokenAccessorConfig, + const NYq::TYqSharedResources::TPtr& sharedResources, + const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, + const NPq::NConfigurationManager::IConnections::TPtr& cmConnections, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + const NYql::IHTTPGateway::TPtr& httpGateway, + const NMonitoring::TDynamicCounterPtr& counters) { + return new TTestConnectionActor(config, controlPlaneStorageConfig, commonConfig, + tokenAccessorConfig, sharedResources, + credentialsFactory, cmConnections, + functionRegistry, httpGateway, counters); } } // namespace NYq diff --git a/ydb/core/yq/libs/test_connection/test_connection.h b/ydb/core/yq/libs/test_connection/test_connection.h index 7be25e8e57..258b6f86ac 100644 --- a/ydb/core/yq/libs/test_connection/test_connection.h +++ b/ydb/core/yq/libs/test_connection/test_connection.h @@ -1,7 +1,14 @@ #pragma once +#include "counters.h" + #include <ydb/core/yq/libs/actors/logging/log.h> #include <ydb/core/yq/libs/config/protos/test_connection.pb.h> +#include <ydb/core/yq/libs/shared_resources/shared_resources.h> +#include <ydb/core/yq/libs/signer/signer.h> +#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> +#include <ydb/library/yql/providers/pq/cm_client/interface/client.h> +#include <ydb/public/api/protos/yq.pb.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/monlib/dynamic_counters/counters.h> @@ -18,10 +25,71 @@ #define TC_LOG_T(s) \ LOG_YQ_TEST_CONNECTION_TRACE(s) +#define TC_LOG_AS_D(a, s) \ + LOG_YQ_TEST_CONNECTION_AS_DEBUG(a, s) +#define TC_LOG_AS_I(a, s) \ + LOG_YQ_TEST_CONNECTION_AS_INFO(a, s) +#define TC_LOG_AS_W(a, s) \ + LOG_YQ_TEST_CONNECTION_AS_WARN(a, s) +#define TC_LOG_AS_E(a, s) \ + LOG_YQ_TEST_CONNECTION_AS_ERROR(a, s) +#define TC_LOG_AS_T(a, s) \ + LOG_YQ_TEST_CONNECTION_AS_TRACE(a, s) + namespace NYq { NActors::TActorId TestConnectionActorId(); -NActors::IActor* CreateTestConnectionActor(const NConfig::TTestConnectionConfig& config, const NMonitoring::TDynamicCounterPtr& counters); +NActors::IActor* CreateTestConnectionActor( + const NConfig::TTestConnectionConfig& config, + const NConfig::TControlPlaneStorageConfig& controlPlaneStorageConfig, + const NConfig::TCommonConfig& commonConfig, + const NConfig::TTokenAccessorConfig& tokenAccessorConfig, + const NYq::TYqSharedResources::TPtr& sharedResources, + const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, + const NPq::NConfigurationManager::IConnections::TPtr& cmConnections, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + const NYql::IHTTPGateway::TPtr& httpGateway, + const NMonitoring::TDynamicCounterPtr& counters); + +NActors::IActor* CreateTestDataStreamsConnectionActor( + const YandexQuery::DataStreams& ds, + const NYq::NConfig::TCommonConfig& commonConfig, + const std::shared_ptr<NYql::IDatabaseAsyncResolver>& dbResolver, + const NActors::TActorId& sender, + ui64 cookie, + const NYq::TYqSharedResources::TPtr& sharedResources, + const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, + const ::NPq::NConfigurationManager::IConnections::TPtr& cmConnections, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + const TString& scope, + const TString& user, + const TString& token, + const NYq::TSigner::TPtr& signer, + const TTestConnectionRequestCountersPtr& counters); + +NActors::IActor* CreateTestObjectStorageConnectionActor( + const YandexQuery::ObjectStorageConnection& os, + const NYq::NConfig::TCommonConfig& commonConfig, + const NActors::TActorId& sender, + ui64 cookie, + const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, + NYql::IHTTPGateway::TPtr gateway, + const TString& scope, + const TString& user, + const TString& token, + const NYq::TSigner::TPtr& signer, + const TTestConnectionRequestCountersPtr& counters); + +NActors::IActor* CreateTestMonitoringConnectionActor( + const YandexQuery::Monitoring& monitoring, + const NActors::TActorId& sender, + ui64 cookie, + const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, + const TString& scope, + const TString& user, + const TString& token, + const NYq::TSigner::TPtr& signer, + const TTestConnectionRequestCountersPtr& counters); } // namespace NYq diff --git a/ydb/core/yq/libs/test_connection/test_data_streams.cpp b/ydb/core/yq/libs/test_connection/test_data_streams.cpp new file mode 100644 index 0000000000..1bcab3697d --- /dev/null +++ b/ydb/core/yq/libs/test_connection/test_data_streams.cpp @@ -0,0 +1,293 @@ +#include "events/events.h" +#include "probes.h" +#include "test_connection.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +#include <ydb/core/yq/libs/actors/clusters_from_connections.h> +#include <ydb/core/yq/libs/config/yq_issue.h> +#include <ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h> +#include <ydb/library/security/util.h> +#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h> +#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h> +#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h> + +namespace { + +struct TEvPrivate { + enum EEv { + EvResolveDbResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvOpenSessionResponse, + EvCheckListStreamsResponse, + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + + struct TEvResolveDbResponse : NActors::TEventLocal<TEvResolveDbResponse, EvResolveDbResponse> { + NYql::TDbResolverResponse Result; + + TEvResolveDbResponse(const NYql::TDbResolverResponse& result) + : Result(result) + {} + }; + + struct TEvOpenSessionResponse : NActors::TEventLocal<TEvOpenSessionResponse, EvOpenSessionResponse> { + bool IsSuccess = false; + TString ErrorMessage; + + TEvOpenSessionResponse(const TString& errorMessage) + : IsSuccess(false) + , ErrorMessage(errorMessage) + {} + + TEvOpenSessionResponse() + : IsSuccess(true) + {} + }; + + struct TEvCheckListStreamsResponse : NActors::TEventLocal<TEvCheckListStreamsResponse, EvCheckListStreamsResponse> { + bool IsSuccess = false; + TString ErrorMessage; + + TEvCheckListStreamsResponse(const TString& errorMessage) + : IsSuccess(false) + , ErrorMessage(errorMessage) + {} + + TEvCheckListStreamsResponse() + : IsSuccess(true) + {} + }; +}; + +} + +namespace NYq { + +LWTRACE_USING(YQ_TEST_CONNECTION_PROVIDER); + +using namespace NActors; + +class TTestDataStreamsConnectionActor : public NActors::TActorBootstrapped<TTestDataStreamsConnectionActor> { + inline static const TString SessionName = "test_connection_data_streams"; + + NYq::NConfig::TCommonConfig CommonConfig; + TActorId Sender; + ui64 Cookie; + TString Scope; + TString User; + TString Token; + TTestConnectionRequestCountersPtr Counters; + NYq::TYqSharedResources::TPtr SharedResources; + NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; + ::NPq::NConfigurationManager::IConnections::TPtr CmConnections; + const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; + std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver; + NYql::TPqClusterConfig ClusterConfig{}; + TString StructuredToken{}; + NYql::IPqGateway::TPtr Gateway{}; + const TInstant StartTime = TInstant::Now(); + +public: + TTestDataStreamsConnectionActor( + const YandexQuery::DataStreams& ds, + const NYq::NConfig::TCommonConfig& commonConfig, + const std::shared_ptr<NYql::IDatabaseAsyncResolver>& dbResolver, + const TActorId& sender, + ui64 cookie, + const NYq::TYqSharedResources::TPtr& sharedResources, + const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, + const ::NPq::NConfigurationManager::IConnections::TPtr& cmConnections, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + const TString& scope, + const TString& user, + const TString& token, + const NYq::TSigner::TPtr& signer, + const TTestConnectionRequestCountersPtr& counters) + : CommonConfig(commonConfig) + , Sender(sender) + , Cookie(cookie) + , Scope(scope) + , User(user) + , Token(token) + , Counters(counters) + , SharedResources(sharedResources) + , CredentialsFactory(credentialsFactory) + , CmConnections(cmConnections) + , FunctionRegistry(functionRegistry) + , DbResolver(dbResolver) + , ClusterConfig(CreateClusterConfig(SessionName, CommonConfig, Token, signer, ds)) + , StructuredToken(NYql::ComposeStructuredTokenJsonForServiceAccount(ClusterConfig.GetServiceAccountId(), ClusterConfig.GetServiceAccountIdSignature(), ClusterConfig.GetToken())) + { + Counters->InFly->Inc(); + } + + static constexpr char ActorName[] = "YQ_TEST_DATA_STREAMS_CONNECTION"; + + void Bootstrap() { + TC_LOG_D(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Starting test data stream connection actor. Actor id: " << SelfId()); + TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Structured token: " << StructuredToken << " service-account: " << ClusterConfig.GetServiceAccountId() << " signature: " << ClusterConfig.GetServiceAccountIdSignature() << " token: " << NKikimr::MaskTicket(ClusterConfig.GetToken())); + Become(&TTestDataStreamsConnectionActor::StateFunc); + SendResolveDatabaseId(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvResolveDbResponse, Handler); + hFunc(TEvPrivate::TEvOpenSessionResponse, Handler); + hFunc(TEvPrivate::TEvCheckListStreamsResponse, Handler); + ) + +private: + void SendResolveDatabaseId() { + if (ClusterConfig.GetDatabase()) { + TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Database from connection settings " << ClusterConfig.GetDatabase()); + SendOpenSession(); + return; + } + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids; + ids[std::pair{ClusterConfig.GetDatabaseId(), NYql::DatabaseType::DataStreams}] = {StructuredToken, CommonConfig.GetUseBearerForYdb()}; + DbResolver->ResolveIds(ids).Apply([self=SelfId(), as=TActivationContext::ActorSystem()](const auto& future) { + try { + auto result = future.GetValue(); + as->Send(new IEventHandle(self, self, new TEvPrivate::TEvResolveDbResponse(result), 0)); + } catch (...) { + as->Send(new IEventHandle(self, self, new TEvPrivate::TEvResolveDbResponse(NYql::TDbResolverResponse{{}, false, NYql::TIssues{MakeErrorIssue(NYq::TIssuesIds::BAD_REQUEST, CurrentExceptionMessage())}}), 0)); + } + }); + } + + void Handler(TEvPrivate::TEvResolveDbResponse::TPtr& ev) { + const auto& response = ev->Get()->Result; + if (!response.Success) { + TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Resolve datababse id " << ClusterConfig.GetDatabaseId() << " error " << response.Issues.ToOneLineString()); + ReplyError(response.Issues); + return; + } + + auto it = response.DatabaseId2Endpoint.find(std::pair{ClusterConfig.GetDatabaseId(), NYql::DatabaseType::DataStreams}); + if (it == response.DatabaseId2Endpoint.end()) { + TC_LOG_E(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Test data streams connection: database is not found for database_id " << ClusterConfig.GetDatabaseId()); + ReplyError(TStringBuilder{} << "Test data streams connection: database is not found for database_id " << ClusterConfig.GetDatabaseId()); + return; + } + + TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Resolve datababse id result: " << it->second.Database); + ClusterConfig.SetDatabase(it->second.Database); + ClusterConfig.SetEndpoint(it->second.Endpoint); + ClusterConfig.SetUseSsl(it->second.Secure); + SendOpenSession(); + } + + void SendOpenSession() { + Gateway = NYql::CreatePqNativeGateway(CreateGatewayServices()); + Gateway->OpenSession(SessionName, {}).Apply([self=SelfId(), as=TActivationContext::ActorSystem()](const auto& future) { + try { + future.TryRethrow(); + as->Send(new IEventHandle(self, self, new TEvPrivate::TEvOpenSessionResponse(), 0)); + } catch (...) { + as->Send(new IEventHandle(self, self, new TEvPrivate::TEvOpenSessionResponse(CurrentExceptionMessage()), 0)); + } + }); + } + + void Handler(TEvPrivate::TEvOpenSessionResponse::TPtr& ev) { + const auto& response = *ev->Get(); + if (!response.IsSuccess) { + TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Open session error " << response.ErrorMessage); + ReplyError(response.ErrorMessage); + return; + } + TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Open session: ok"); + SendCheckListStreams(); + } + + void SendCheckListStreams() { + Gateway->ListStreams(SessionName, SessionName, ClusterConfig.GetDatabase(), StructuredToken, 1).Apply([self=SelfId(), as=TActivationContext::ActorSystem()](const auto& future) { + try { + future.TryRethrow(); + as->Send(new IEventHandle(self, self, new TEvPrivate::TEvCheckListStreamsResponse(), 0)); + } catch (...) { + as->Send(new IEventHandle(self, self, new TEvPrivate::TEvCheckListStreamsResponse(CurrentExceptionMessage()), 0)); + } + }); + } + + void Handler(TEvPrivate::TEvCheckListStreamsResponse::TPtr& ev) { + const auto& response = *ev->Get(); + if (!response.IsSuccess) { + TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Check list strems error " << response.ErrorMessage); + ReplyError(response.ErrorMessage); + return; + } + TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Check list streams: ok"); + ReplyOk(); + } + + void DestroyActor(bool success = true) { + Counters->InFly->Dec(); + TDuration delta = TInstant::Now() - StartTime; + Counters->LatencyMs->Collect(delta.MilliSeconds()); + LWPROBE(TestDataStreamsConnectionRequest, Scope, User, delta, success); + PassAway(); + } + + void ReplyError(const NYql::TIssues& issues) { + Counters->Error->Inc(); + Send(Sender, new NYq::TEvTestConnection::TEvTestConnectionResponse(issues), 0, Cookie); + DestroyActor(false /* success */); + } + + void ReplyError(const TString& message) { + ReplyError(NYql::TIssues{MakeErrorIssue(NYq::TIssuesIds::BAD_REQUEST, message)}); + } + + void ReplyOk() { + Counters->Ok->Inc(); + Send(Sender, new NYq::TEvTestConnection::TEvTestConnectionResponse(YandexQuery::TestConnectionResult{}), 0, Cookie); + DestroyActor(); + } + + static NYql::TPqClusterConfig CreateClusterConfig(const TString& sessionName, const NYq::NConfig::TCommonConfig& commonConfig, const TString& token, const NYq::TSigner::TPtr& signer, const YandexQuery::DataStreams& ds) { + const auto& auth = ds.auth(); + const TString signedAccountId = signer && auth.has_service_account() ? signer->SignAccountId(auth.service_account().id()) : TString{}; + return NYq::CreatePqClusterConfig(sessionName, commonConfig.GetUseBearerForYdb(), token, signedAccountId, ds); + } + + NYql::TPqGatewayServices CreateGatewayServices() { + NYql::TPqGatewayConfig config; + *config.AddClusterMapping() = ClusterConfig; + NYql::TPqGatewayServices pqServices( + SharedResources->UserSpaceYdbDriver, + CmConnections, + CredentialsFactory, + std::make_shared<NYql::TPqGatewayConfig>(config), + FunctionRegistry + ); + return pqServices; + } +}; + +NActors::IActor* CreateTestDataStreamsConnectionActor( + const YandexQuery::DataStreams& ds, + const NYq::NConfig::TCommonConfig& commonConfig, + const std::shared_ptr<NYql::IDatabaseAsyncResolver>& dbResolver, + const TActorId& sender, + ui64 cookie, + const NYq::TYqSharedResources::TPtr& sharedResources, + const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, + const ::NPq::NConfigurationManager::IConnections::TPtr& cmConnections, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + const TString& scope, + const TString& user, + const TString& token, + const NYq::TSigner::TPtr& signer, + const TTestConnectionRequestCountersPtr& counters) { + return new TTestDataStreamsConnectionActor( + ds, commonConfig, dbResolver, sender, + cookie, sharedResources, credentialsFactory, + cmConnections, functionRegistry, + scope, user, token, signer, counters); +} + +} // namespace NYq diff --git a/ydb/core/yq/libs/test_connection/test_monitoring.cpp b/ydb/core/yq/libs/test_connection/test_monitoring.cpp new file mode 100644 index 0000000000..fd2e7a1e54 --- /dev/null +++ b/ydb/core/yq/libs/test_connection/test_monitoring.cpp @@ -0,0 +1,160 @@ +#include "probes.h" +#include "test_connection.h" + +#include <ydb/core/yq/libs/actors/clusters_from_connections.h> +#include <ydb/core/yq/libs/config/yq_issue.h> +#include <ydb/core/yq/libs/test_connection/events/events.h> +#include <ydb/library/security/util.h> + +#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h> +#include <ydb/library/yql/utils/url_builder.h> + +#include <ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h> +#include <ydb/library/yql/utils/actors/http_sender_actor.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NYq { + +LWTRACE_USING(YQ_TEST_CONNECTION_PROVIDER); + +using namespace NActors; + +class TTestMonitoringConnectionActor : public NActors::TActorBootstrapped<TTestMonitoringConnectionActor> { + TActorId Sender; + TActorId HttpProxyId; + ui64 Cookie; + TString Scope; + TString User; + TString Token; + TTestConnectionRequestCountersPtr Counters; + NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; + NYq::TSigner::TPtr Signer; + NYql::TSolomonClusterConfig ClusterConfig; + const TInstant StartTime = TInstant::Now(); + +public: + TTestMonitoringConnectionActor( + const YandexQuery::Monitoring& monitoring, + const TActorId& sender, + ui64 cookie, + const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, + const TString& scope, + const TString& user, + const TString& token, + const NYq::TSigner::TPtr& signer, + const TTestConnectionRequestCountersPtr& counters) + : Sender(sender) + , Cookie(cookie) + , Scope(scope) + , User(user) + , Token(token) + , Counters(counters) + , CredentialsFactory(credentialsFactory) + , Signer(signer) + , ClusterConfig(NYq::CreateSolomonClusterConfig({}, token, signer ? signer->SignAccountId(monitoring.auth().service_account().id()) : "", monitoring)) + { + Counters->InFly->Inc(); + } + + static constexpr char ActorName[] = "YQ_TEST_MONITORING_CONNECTION"; + + void Bootstrap() { + TC_LOG_D(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Starting test monitoring connection actor. Actor id: " << SelfId()); + Become(&TTestMonitoringConnectionActor::StateFunc); + + HttpProxyId = Register(NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance())); + const NHttp::THttpOutgoingRequestPtr httpRequest = BuildSolomonRequest(); + const TActorId httpSenderId = Register(NYql::NDq::CreateHttpSenderActor(SelfId(), HttpProxyId)); + Send(httpSenderId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest), /*flags=*/0, Cookie); + TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " send request " << httpRequest->Method << " " << httpRequest->Protocol << " " << httpRequest->Host << " " << httpRequest->URL << " " << httpRequest->Body); + } + + void FillAuth(NHttp::THttpOutgoingRequestPtr& httpRequest) { + const TString authorizationHeader = "Authorization"; + const auto structedToken = NYql::ComposeStructuredTokenJsonForServiceAccount(ClusterConfig.GetServiceAccountId(), ClusterConfig.GetServiceAccountIdSignature(), ClusterConfig.GetToken()); + const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(CredentialsFactory, structedToken); + const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo(); + + switch (static_cast<NYql::NSo::NProto::ESolomonClusterType>(ClusterConfig.GetClusterType())) { + case NYql::NSo::NProto::ESolomonClusterType::CT_SOLOMON: + httpRequest->Set(authorizationHeader, "OAuth " + authToken); + break; + case NYql::NSo::NProto::ESolomonClusterType::CT_MONITORING: + httpRequest->Set(authorizationHeader, "Bearer " + authToken); + break; + default: + Y_ENSURE(false, "Invalid cluster type " << ToString<ui32>(ClusterConfig.GetClusterType())); + } + } + + NHttp::THttpOutgoingRequestPtr BuildSolomonRequest() { + const TString url = NYql::NDq::GetSolomonUrl(ClusterConfig.GetCluster(), + ClusterConfig.GetUseSsl(), + ClusterConfig.GetPath().GetProject(), + ClusterConfig.GetPath().GetCluster(), + {}, + static_cast<NYql::NSo::NProto::ESolomonClusterType>(ClusterConfig.GetClusterType())); + NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestPost(url); + FillAuth(httpRequest); + httpRequest->Set<&NHttp::THttpRequest::ContentType>("application/json"); + httpRequest->Set<&NHttp::THttpRequest::Body>("{}"); + return httpRequest; + } + + STRICT_STFUNC(StateFunc, + hFunc(NYql::NDq::TEvHttpBase::TEvSendResult, Handle); + ) + + void Handle(NYql::NDq::TEvHttpBase::TEvSendResult::TPtr& ev) { + const auto* res = ev->Get(); + if (res->HttpIncomingResponse->Get()->Response->Status == "400") { + TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " ok " << res->HttpIncomingResponse->Get()->ToString()); + ReplyOk(); + return; + } + + const TString& error = res->HttpIncomingResponse->Get()->GetError(); + TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " access problem " << res->HttpIncomingResponse->Get()->ToString() << " " << error); + ReplyError(error); + } + + void DestroyActor(bool isSuccess) { + Counters->InFly->Dec(); + TDuration delta = TInstant::Now() - StartTime; + Counters->LatencyMs->Collect(delta.MilliSeconds()); + LWPROBE(TestMonitoringConnectionRequest, Scope, User, delta, isSuccess); + Send(HttpProxyId, new NActors::TEvents::TEvPoison()); + PassAway(); + } + + void ReplyError(const TString& message) { + Counters->Error->Inc(); + Send(Sender, new NYq::TEvTestConnection::TEvTestConnectionResponse(NYql::TIssues{MakeErrorIssue(NYq::TIssuesIds::BAD_REQUEST, message)}), Cookie); + DestroyActor(false /* success */); + } + + void ReplyOk() { + Counters->Ok->Inc(); + Send(Sender, new NYq::TEvTestConnection::TEvTestConnectionResponse(YandexQuery::TestConnectionResult{}), Cookie); + DestroyActor(true /* success */); + } +}; + +NActors::IActor* CreateTestMonitoringConnectionActor( + const YandexQuery::Monitoring& monitoring, + const TActorId& sender, + ui64 cookie, + const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, + const TString& scope, + const TString& user, + const TString& token, + const NYq::TSigner::TPtr& signer, + const TTestConnectionRequestCountersPtr& counters) { + return new TTestMonitoringConnectionActor( + monitoring, sender, + cookie, credentialsFactory, + scope, user, token, signer, counters); +} + +} // namespace NYq diff --git a/ydb/core/yq/libs/test_connection/test_object_storage.cpp b/ydb/core/yq/libs/test_connection/test_object_storage.cpp new file mode 100644 index 0000000000..80832654cf --- /dev/null +++ b/ydb/core/yq/libs/test_connection/test_object_storage.cpp @@ -0,0 +1,223 @@ +#include "events/events.h" +#include "probes.h" +#include "test_connection.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +#include <ydb/core/yq/libs/actors/clusters_from_connections.h> +#include <ydb/core/yq/libs/config/yq_issue.h> +#include <ydb/library/security/util.h> + +#include <ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.h> +#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h> +#include <ydb/library/yql/utils/url_builder.h> + +#ifdef THROW +#undef THROW +#endif +#include <library/cpp/xml/document/xml-document.h> + +namespace { + +struct TEvPrivate { + enum EEv { + EvDiscoveryResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + + struct TEvDiscoveryResponse : NActors::TEventLocal<TEvDiscoveryResponse, EvDiscoveryResponse> { + bool IsSuccess = false; + TString ErrorMessage; + + TEvDiscoveryResponse(const TString& errorMessage) + : IsSuccess(false) + , ErrorMessage(errorMessage) + {} + + TEvDiscoveryResponse() + : IsSuccess(true) + {} + }; +}; + +} + +namespace NYq { + +LWTRACE_USING(YQ_TEST_CONNECTION_PROVIDER); + +using namespace NActors; + +class TTestObjectStorageConnectionActor : public NActors::TActorBootstrapped<TTestObjectStorageConnectionActor> { + TTestConnectionRequestCountersPtr Counters; + TActorId Sender; + TString Scope; + TString User; + TString Token; + ui64 Cookie; + NYql::IHTTPGateway::TPtr Gateway; + NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; + NYql::TS3ClusterConfig ClusterConfig; + + const TInstant StartTime = TInstant::Now(); + +public: + TTestObjectStorageConnectionActor( + const YandexQuery::ObjectStorageConnection& os, + const NYq::NConfig::TCommonConfig& commonConfig, + const TActorId& sender, + ui64 cookie, + const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, + NYql::IHTTPGateway::TPtr gateway, + const TString& scope, + const TString& user, + const TString& token, + const NYq::TSigner::TPtr& signer, + const TTestConnectionRequestCountersPtr& counters) + : Counters(counters) + , Sender(sender) + , Scope(scope) + , User(user) + , Token(token) + , Cookie(cookie) + , Gateway(gateway) + , CredentialsFactory(credentialsFactory) + , ClusterConfig(NYq::CreateS3ClusterConfig({}, token, commonConfig.GetObjectStorageEndpoint(), signer ? signer->SignAccountId(os.auth().service_account().id()) : "", os)) + { + Counters->InFly->Inc(); + } + + static constexpr char ActorName[] = "YQ_TEST_OBJECT_STORAGE_CONNECTION"; + + void Bootstrap() { + Become(&TTestObjectStorageConnectionActor::StateFunc); + TC_LOG_D(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Starting test object storage connection actor. Actor id: " << SelfId()); + SendDiscover(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvDiscoveryResponse, Handle); + ) + + void Handle(TEvPrivate::TEvDiscoveryResponse::TPtr& ev) { + const auto& response = *ev->Get(); + if (response.IsSuccess) { + ReplyOk(); + } else { + ReplyError(response.ErrorMessage); + } + } + +private: + static void SendError(TActorId self, NActors::TActorSystem* as, const TString& errorMessage) { + as->Send(new IEventHandle(self, self, new TEvPrivate::TEvDiscoveryResponse(errorMessage), 0)); + } + + static void SendOk(TActorId self, NActors::TActorSystem* as) { + as->Send(new IEventHandle(self, self, new TEvPrivate::TEvDiscoveryResponse(), 0)); + } + + static void DiscoveryCallback(NYql::IHTTPGateway::TResult&& result, TActorId self, NActors::TActorSystem* as) { + switch (result.index()) { + case 0U: try { + const NXml::TDocument xml(std::get<NYql::IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String); + if (const auto& root = xml.Root(); root.Name() == "Error") { + const auto& code = root.Node("Code", true).Value<TString>(); + const auto& message = root.Node("Message", true).Value<TString>(); + SendError(self, as, TStringBuilder() << message << ", code: " << code); + return; + } else if (root.Name() != "ListBucketResult") { + SendError(self, as, TStringBuilder() << "Unexpected response '" << root.Name() << "' on discovery."); + return; + } else { + break; + } + } catch (const std::exception& ex) { + SendError(self, as, TStringBuilder() << "Exception occurred: " << ex.what()); + return; + } + case 1U: + SendError(self, as, TStringBuilder() << "Issues occurred: " << std::get<NYql::TIssues>(result).ToString()); + return; + default: + SendError(self, as, TStringBuilder() << "Undefined variant index: " << result.index()); + return; + } + SendOk(self, as); + } + + static ERetryErrorClass RetryS3SlowDown(long httpResponseCode) { + return httpResponseCode == 503 ? ERetryErrorClass::LongRetry : ERetryErrorClass::NoRetry; // S3 Slow Down == 503 + } + + void SendDiscover() { + const auto structedToken = NYql::ComposeStructuredTokenJsonForServiceAccount(ClusterConfig.GetServiceAccountId(), ClusterConfig.GetServiceAccountIdSignature(), ClusterConfig.GetToken()); + const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(CredentialsFactory, structedToken); + const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo(); + + NYql::IHTTPGateway::THeaders headers; + if (authToken) { + headers.push_back(TString("X-YaCloud-SubjectToken:") += authToken); + } + + const auto retryPolicy = IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryS3SlowDown); + + NYql::TUrlBuilder urlBuilder(ClusterConfig.GetUrl()); + const auto url = urlBuilder.AddUrlParam("list-type", "2") + .AddUrlParam("max-keys", "1") + .Build(); + + Gateway->Download( + url, + headers, + 0U, + std::bind(&DiscoveryCallback, std::placeholders::_1, SelfId(), TActivationContext::ActorSystem()), + /*data=*/"", + retryPolicy + ); + } + + void DestroyActor(bool success = true) { + Counters->InFly->Dec(); + TDuration delta = TInstant::Now() - StartTime; + Counters->LatencyMs->Collect(delta.MilliSeconds()); + LWPROBE(TestObjectStorageConnectionRequest, Scope, User, delta, success); + PassAway(); + } + + void ReplyError(const TString& message) { + TC_LOG_D(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Invalid access for object storage connection: " << message); + Counters->Error->Inc(); + Send(Sender, new NYq::TEvTestConnection::TEvTestConnectionResponse(NYql::TIssues{MakeErrorIssue(NYq::TIssuesIds::BAD_REQUEST, message)}), 0, Cookie); + DestroyActor(false /* success */); + } + + void ReplyOk() { + TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Access is valid for object storage connection"); + Counters->Ok->Inc(); + Send(Sender, new NYq::TEvTestConnection::TEvTestConnectionResponse(YandexQuery::TestConnectionResult{}), 0, Cookie); + DestroyActor(); + } +}; + +NActors::IActor* CreateTestObjectStorageConnectionActor( + const YandexQuery::ObjectStorageConnection& os, + const NYq::NConfig::TCommonConfig& commonConfig, + const TActorId& sender, + ui64 cookie, + const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, + NYql::IHTTPGateway::TPtr gateway, + const TString& scope, + const TString& user, + const TString& token, + const NYq::TSigner::TPtr& signer, + const TTestConnectionRequestCountersPtr& counters) { + return new TTestObjectStorageConnectionActor( + os, commonConfig, sender, + cookie, credentialsFactory, gateway, + scope, user, token, signer, counters); +} + +} // namespace NYq diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp index b1d64befb3..b04f11ac02 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp @@ -42,6 +42,11 @@ NPq::NConfigurationManager::TAsyncDescribePathResult TDummyPqGateway::DescribePa } } +NThreading::TFuture<IPqGateway::TListStreams> TDummyPqGateway::ListStreams(const TString& sessionId, const TString& cluster, const TString& database, const TString& token, ui32 limit, const TString& exclusiveStartStreamName) { + Y_UNUSED(sessionId, cluster, database, token, limit, exclusiveStartStreamName); + return NThreading::MakeFuture<IPqGateway::TListStreams>(); +} + TDummyPqGateway& TDummyPqGateway::AddDummyTopic(const TDummyTopic& topic) { with_lock (Mutex) { Y_ENSURE(topic.Cluster); diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h index 753c849c20..dbf6f6b52b 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h @@ -34,7 +34,20 @@ public: NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override; void CloseSession(const TString& sessionId) override; - NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath(const TString& sessionId, const TString& cluster, const TString& database, const TString& path, const TString& token) override; + NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath( + const TString& sessionId, + const TString& cluster, + const TString& database, + const TString& path, + const TString& token) override; + + NThreading::TFuture<TListStreams> ListStreams( + const TString& sessionId, + const TString& cluster, + const TString& database, + const TString& token, + ui32 limit, + const TString& exclusiveStartStreamName = {}) override; void UpdateClusterConfigs( const TString& clusterName, diff --git a/ydb/library/yql/providers/pq/gateway/native/CMakeLists.txt b/ydb/library/yql/providers/pq/gateway/native/CMakeLists.txt index 9ec4274875..3c8f646c99 100644 --- a/ydb/library/yql/providers/pq/gateway/native/CMakeLists.txt +++ b/ydb/library/yql/providers/pq/gateway/native/CMakeLists.txt @@ -13,6 +13,7 @@ target_link_libraries(pq-gateway-native PUBLIC yutil common-token_accessor-client library-yql-utils + cpp-client-ydb_datastreams cpp-client-ydb_driver cpp-client-ydb_persqueue_core providers-common-metrics diff --git a/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp b/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp index bd89565050..86673da5a1 100644 --- a/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp +++ b/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp @@ -26,6 +26,14 @@ public: const TString& path, const TString& token) override; + NThreading::TFuture<TListStreams> ListStreams( + const TString& sessionId, + const TString& cluster, + const TString& database, + const TString& token, + ui32 limit, + const TString& exclusiveStartStreamName = {}) override; + void UpdateClusterConfigs( const TString& clusterName, const TString& endpoint, @@ -120,6 +128,10 @@ NPq::NConfigurationManager::TAsyncDescribePathResult TPqNativeGateway::DescribeP return GetExistingSession(sessionId)->DescribePath(cluster, database, path, token); } +NThreading::TFuture<IPqGateway::TListStreams> TPqNativeGateway::ListStreams(const TString& sessionId, const TString& cluster, const TString& database, const TString& token, ui32 limit, const TString& exclusiveStartStreamName) { + return GetExistingSession(sessionId)->ListStreams(cluster, database, token, limit, exclusiveStartStreamName); +} + IPqGateway::TPtr CreatePqNativeGateway(const TPqGatewayServices& services) { return MakeIntrusive<TPqNativeGateway>(services); } diff --git a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp index cd536762e4..18d1ae0e47 100644 --- a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp +++ b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp @@ -25,6 +25,17 @@ NYdb::NPersQueue::TPersQueueClientSettings GetYdbPqClientOptions(const TString& return opts; } + +NYdb::TCommonClientSettings GetDsClientOptions(const TString& database, const NYql::TPqClusterConfig& cfg, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory) { + NYdb::TCommonClientSettings opts; + opts + .DiscoveryEndpoint(cfg.GetEndpoint()) + .Database(database) + .EnableSsl(cfg.GetUseSsl()) + .CredentialsProviderFactory(credentialsProviderFactory); + + return opts; +} } const NPq::NConfigurationManager::IClient::TPtr& TPqSession::GetConfigManagerClient(const TString& cluster, const NYql::TPqClusterConfig& cfg, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory) { @@ -43,6 +54,14 @@ NYdb::NPersQueue::TPersQueueClient& TPqSession::GetYdbPqClient(const TString& cl return ClusterYdbPqClients.emplace(cluster, NYdb::NPersQueue::TPersQueueClient(YdbDriver, GetYdbPqClientOptions(database, cfg, credentialsProviderFactory))).first->second; } +NYdb::NDataStreams::V1::TDataStreamsClient& TPqSession::GetDsClient(const TString& cluster, const TString& database, const NYql::TPqClusterConfig& cfg, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory) { + const auto clientIt = ClusterDsClients.find(cluster); + if (clientIt != ClusterDsClients.end()) { + return clientIt->second; + } + return ClusterDsClients.emplace(cluster, NYdb::NDataStreams::V1::TDataStreamsClient(YdbDriver, GetDsClientOptions(database, cfg, credentialsProviderFactory))).first->second; +} + NPq::NConfigurationManager::TAsyncDescribePathResult TPqSession::DescribePath(const TString& cluster, const TString& database, const TString& path, const TString& token) { const auto* config = ClusterConfigs->FindPtr(cluster); if (!config) { @@ -78,4 +97,49 @@ NPq::NConfigurationManager::TAsyncDescribePathResult TPqSession::DescribePath(co } } +NThreading::TFuture<IPqGateway::TListStreams> TPqSession::ListStreams(const TString& cluster, const TString& database, const TString& token, ui32 limit, const TString& exclusiveStartStreamName) { + const auto* config = ClusterConfigs->FindPtr(cluster); + if (!config) { + ythrow yexception() << "Pq cluster `" << cluster << "` does not exist"; + } + + YQL_ENSURE(config->GetEndpoint(), "Can't get list topics for " << cluster << ": no endpoint"); + + std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(CredentialsFactory, token, config->GetAddBearerToToken()); + with_lock (Mutex) { + if (config->GetClusterType() == TPqClusterConfig::CT_PERS_QUEUE) { + const NPq::NConfigurationManager::IClient::TPtr& client = GetConfigManagerClient(cluster, *config, credentialsProviderFactory); + if (!client) { + NThreading::TPromise<IPqGateway::TListStreams> result = NThreading::NewPromise<IPqGateway::TListStreams>(); + result.SetException( + std::make_exception_ptr( + yexception() + << "Pq configuration manager is not supported")); + return result; + } + return client->DescribePath("/").Apply([](const auto& future) { + auto response = future.GetValue(); + if (!response.IsPath()) { + throw yexception() << "response does not contain object of type path"; + } + return IPqGateway::TListStreams{}; + }); + } + + return GetDsClient(cluster, database, *config, credentialsProviderFactory) + .ListStreams(NYdb::NDataStreams::V1::TListStreamsSettings{ .Limit_ = limit, .ExclusiveStartStreamName_ = exclusiveStartStreamName}) + .Apply([](const auto& future) { + auto& response = future.GetValue(); + if (!response.IsSuccess()) { + throw yexception() << response.GetIssues().ToString(); + } + const auto& result = response.GetResult(); + IPqGateway::TListStreams listStrems; + listStrems.Names.insert(listStrems.Names.end(), result.stream_names().begin(), result.stream_names().end()); + return listStrems; + + }); + } +} + } // namespace NYql diff --git a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.h b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.h index 23129837f0..9ab98b22e0 100644 --- a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.h +++ b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.h @@ -1,10 +1,13 @@ #pragma once +#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> -#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> +#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <ydb/library/yql/providers/pq/cm_client/interface/client.h> +#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h> + #include <util/generic/ptr.h> #include <util/system/mutex.h> @@ -35,9 +38,12 @@ public: NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath(const TString& cluster, const TString& database, const TString& path, const TString& token); + NThreading::TFuture<IPqGateway::TListStreams> ListStreams(const TString& cluster, const TString& database, const TString& token, ui32 limit, const TString& exclusiveStartStreamName); + private: const NPq::NConfigurationManager::IClient::TPtr& GetConfigManagerClient(const TString& cluster, const NYql::TPqClusterConfig& cfg, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory); NYdb::NPersQueue::TPersQueueClient& GetYdbPqClient(const TString& cluster, const TString& database, const NYql::TPqClusterConfig& cfg, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory); + NYdb::NDataStreams::V1::TDataStreamsClient& GetDsClient(const TString& cluster, const TString& database, const NYql::TPqClusterConfig& cfg, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory); private: const TString SessionId; @@ -50,6 +56,7 @@ private: TMutex Mutex; THashMap<TString, NPq::NConfigurationManager::IClient::TPtr> ClusterCmClients; // Cluster -> CM Client. THashMap<TString, NYdb::NPersQueue::TPersQueueClient> ClusterYdbPqClients; // Cluster -> PQ Client. + THashMap<TString, NYdb::NDataStreams::V1::TDataStreamsClient> ClusterDsClients; // Cluster -> DS Client }; } // namespace NYql diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h b/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h index 118b37c012..cee8cfffac 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> #include <ydb/library/yql/providers/pq/cm_client/interface/client.h> +#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> #include <library/cpp/threading/future/core/future.h> @@ -12,12 +13,19 @@ namespace NYql { struct IPqGateway : public TThrRefBase { using TPtr = TIntrusivePtr<IPqGateway>; + struct TListStreams { + TVector<TString> Names; + }; + virtual NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) = 0; virtual void CloseSession(const TString& sessionId) = 0; // CM API. virtual ::NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath(const TString& sessionId, const TString& cluster, const TString& database, const TString& path, const TString& token) = 0; + // DS API. + virtual NThreading::TFuture<TListStreams> ListStreams(const TString& sessionId, const TString& cluster, const TString& database, const TString& token, ui32 limit, const TString& exclusiveStartStreamName = {}) = 0; + virtual void UpdateClusterConfigs( const TString& clusterName, const TString& endpoint, diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp index 293bba1a42..d6d4141454 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp @@ -5,13 +5,14 @@ #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h> -#include <ydb/library/yql/utils/actor_log/log.h> -#include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/minikql/comp_nodes/mkql_saveload.h> #include <ydb/library/yql/minikql/mkql_alloc.h> #include <ydb/library/yql/minikql/mkql_string_util.h> -#include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/library/yql/utils/actor_log/log.h> #include <ydb/library/yql/utils/actors/http_sender_actor.h> +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/url_builder.h> +#include <ydb/library/yql/utils/yql_panic.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/event_local.h> @@ -75,6 +76,32 @@ struct TMetricsInflight { } // namespace +TString GetSolomonUrl(const TString& endpoint, bool useSsl, const TString& project, const TString& cluster, const TString& service, const ::NYql::NSo::NProto::ESolomonClusterType& type) { + TUrlBuilder builder((useSsl ? "https://" : "http://") + endpoint); + + switch (type) { + case NSo::NProto::ESolomonClusterType::CT_SOLOMON: { + builder.AddPathComponent("api"); + builder.AddPathComponent("v2"); + builder.AddPathComponent("push"); + builder.AddUrlParam("project", project); + builder.AddUrlParam("cluster", cluster); + builder.AddUrlParam("service", service); + break; + } + case NSo::NProto::ESolomonClusterType::CT_MONITORING: { + builder.AddPathComponent("monitoring/v2/data/write"); + builder.AddUrlParam("folderId", cluster); + builder.AddUrlParam("service", service); + break; + } + default: + Y_ENSURE(false, "Invalid cluster type " << ToString<ui32>(type)); + } + + return builder.Build(); +} + class TDqSolomonWriteActor : public NActors::TActor<TDqSolomonWriteActor>, public IDqComputeActorAsyncOutput { public: static constexpr char ActorName[] = "DQ_SOLOMON_WRITE_ACTOR"; @@ -246,29 +273,12 @@ private: NDqProto::TSinkState BuildState() { return {}; } TString GetUrl() const { - TStringBuilder builder; - builder << (WriteParams.Shard.GetUseSsl() ? "https://" : "http://"); - builder << WriteParams.Shard.GetEndpoint(); - - switch (WriteParams.Shard.GetClusterType()) { - case NSo::NProto::ESolomonClusterType::CT_SOLOMON: { - builder << "/api/v2/push"; - builder << "?project=" << WriteParams.Shard.GetProject(); - builder << "&cluster=" << WriteParams.Shard.GetCluster(); - builder << "&service=" << WriteParams.Shard.GetService(); - break; - } - case NSo::NProto::ESolomonClusterType::CT_MONITORING: { - builder << "/monitoring/v2/data/write"; - builder << "?folderId=" << WriteParams.Shard.GetCluster(); - builder << "&service=" << WriteParams.Shard.GetService(); - break; - } - default: - Y_ENSURE(false, "Invalid cluster type " << ToString<ui32>(WriteParams.Shard.GetClusterType())); - } - - return builder; + return GetSolomonUrl(WriteParams.Shard.GetEndpoint(), + WriteParams.Shard.GetUseSsl(), + WriteParams.Shard.GetProject(), + WriteParams.Shard.GetCluster(), + WriteParams.Shard.GetService(), + WriteParams.Shard.GetClusterType()); } void PushMetricsToBuffer(ui64& metricsCount) { diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h index de6df23756..aa5469f10b 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h @@ -29,4 +29,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolo void RegisterDQSolomonWriteActorFactory(TDqSinkFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); +TString GetSolomonUrl(const TString& endpoint, bool useSsl, const TString& project, const TString& cluster, const TString& service, const ::NYql::NSo::NProto::ESolomonClusterType& type); + } // namespace NYql::NDq |