aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOleg Doronin <fortan57@gmail.com>2022-05-01 02:01:29 +0300
committerOleg Doronin <fortan57@gmail.com>2022-05-01 02:01:29 +0300
commit65e4760cd73b3929ab5eb735210facfff8822495 (patch)
tree1efbe70bcc64c4573fc6fd673204689b06b608ab
parentd227d4f3e8008cea9f135e271a5b7d56db6b5c44 (diff)
downloadydb-65e4760cd73b3929ab5eb735210facfff8822495.tar.gz
test connection YQ-48
test connection for monitoring os + cleanup list streams for pq test connection for yds ref:d44fd535fb66f103981f959c1d1eeeaceddb4944
-rw-r--r--CMakeLists.darwin.txt4
-rw-r--r--CMakeLists.linux.txt4
-rw-r--r--ydb/core/yq/libs/actors/clusters_from_connections.cpp139
-rw-r--r--ydb/core/yq/libs/actors/clusters_from_connections.h6
-rw-r--r--ydb/core/yq/libs/actors/logging/log.h10
-rw-r--r--ydb/core/yq/libs/common/util.h15
-rw-r--r--ydb/core/yq/libs/control_plane_storage/CMakeLists.txt2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/config.cpp56
-rw-r--r--ydb/core/yq/libs/control_plane_storage/config.h32
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/request_validators.cpp108
-rw-r--r--ydb/core/yq/libs/control_plane_storage/request_validators.h152
-rw-r--r--ydb/core/yq/libs/control_plane_storage/util.h16
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp47
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h241
-rw-r--r--ydb/core/yq/libs/init/init.cpp25
-rw-r--r--ydb/core/yq/libs/test_connection/CMakeLists.txt9
-rw-r--r--ydb/core/yq/libs/test_connection/counters.cpp21
-rw-r--r--ydb/core/yq/libs/test_connection/counters.h25
-rw-r--r--ydb/core/yq/libs/test_connection/probes.h18
-rw-r--r--ydb/core/yq/libs/test_connection/request_validators.h22
-rw-r--r--ydb/core/yq/libs/test_connection/test_connection.cpp198
-rw-r--r--ydb/core/yq/libs/test_connection/test_connection.h70
-rw-r--r--ydb/core/yq/libs/test_connection/test_data_streams.cpp293
-rw-r--r--ydb/core/yq/libs/test_connection/test_monitoring.cpp160
-rw-r--r--ydb/core/yq/libs/test_connection/test_object_storage.cpp223
-rw-r--r--ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp5
-rw-r--r--ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h15
-rw-r--r--ydb/library/yql/providers/pq/gateway/native/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp12
-rw-r--r--ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp64
-rw-r--r--ydb/library/yql/providers/pq/gateway/native/yql_pq_session.h9
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_gateway.h8
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp62
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h2
35 files changed, 1688 insertions, 387 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 6756a81997..91b10039eb 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -808,13 +808,13 @@ add_subdirectory(ydb/core/yq/libs/health)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery)
add_subdirectory(ydb/core/yq/libs/test_connection)
add_subdirectory(ydb/core/yq/libs/test_connection/events)
+add_subdirectory(ydb/library/yql/providers/solomon/async_io)
+add_subdirectory(library/cpp/json/easy_parse)
add_subdirectory(ydb/library/yql/dq/comp_nodes)
add_subdirectory(ydb/library/yql/providers/clickhouse/actors)
add_subdirectory(ydb/library/yql/providers/pq/async_io)
add_subdirectory(ydb/library/yql/providers/pq/gateway/native)
add_subdirectory(ydb/library/yql/providers/s3/actors)
-add_subdirectory(ydb/library/yql/providers/solomon/async_io)
-add_subdirectory(library/cpp/json/easy_parse)
add_subdirectory(ydb/library/yql/providers/solomon/gateway)
add_subdirectory(ydb/library/yql/providers/solomon/provider)
add_subdirectory(ydb/library/yql/providers/solomon/expr_nodes)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 0353dc8745..da3fa70606 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -888,13 +888,13 @@ add_subdirectory(ydb/core/yq/libs/health)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery)
add_subdirectory(ydb/core/yq/libs/test_connection)
add_subdirectory(ydb/core/yq/libs/test_connection/events)
+add_subdirectory(ydb/library/yql/providers/solomon/async_io)
+add_subdirectory(library/cpp/json/easy_parse)
add_subdirectory(ydb/library/yql/dq/comp_nodes)
add_subdirectory(ydb/library/yql/providers/clickhouse/actors)
add_subdirectory(ydb/library/yql/providers/pq/async_io)
add_subdirectory(ydb/library/yql/providers/pq/gateway/native)
add_subdirectory(ydb/library/yql/providers/s3/actors)
-add_subdirectory(ydb/library/yql/providers/solomon/async_io)
-add_subdirectory(library/cpp/json/easy_parse)
add_subdirectory(ydb/library/yql/providers/solomon/gateway)
add_subdirectory(ydb/library/yql/providers/solomon/provider)
add_subdirectory(ydb/library/yql/providers/solomon/expr_nodes)
diff --git a/ydb/core/yq/libs/actors/clusters_from_connections.cpp b/ydb/core/yq/libs/actors/clusters_from_connections.cpp
index 93e0471892..8ddc4096ae 100644
--- a/ydb/core/yq/libs/actors/clusters_from_connections.cpp
+++ b/ydb/core/yq/libs/actors/clusters_from_connections.cpp
@@ -13,7 +13,9 @@ using namespace NYql;
namespace {
template <typename TClusterConfig>
-void FillClusterAuth(TClusterConfig& clusterCfg, const YandexQuery::IamAuth& auth, const TString& authToken, const THashMap<TString, TString>& accountIdSignatures) {
+void FillClusterAuth(TClusterConfig& clusterCfg,
+ const YandexQuery::IamAuth& auth, const TString& authToken,
+ const THashMap<TString, TString>& accountIdSignatures) {
switch (auth.identity_case()) {
case YandexQuery::IamAuth::kNone:
break;
@@ -30,8 +32,100 @@ void FillClusterAuth(TClusterConfig& clusterCfg, const YandexQuery::IamAuth& aut
}
}
+void FillPqClusterConfig(NYql::TPqClusterConfig& clusterConfig,
+ const TString& name, bool useBearerForYdb,
+ const TString& authToken, const THashMap<TString, TString>& accountIdSignatures,
+ const YandexQuery::DataStreams& ds) {
+ clusterConfig.SetName(name);
+ if (ds.endpoint()) {
+ clusterConfig.SetEndpoint(ds.endpoint());
+ }
+ clusterConfig.SetDatabase(ds.database());
+ clusterConfig.SetDatabaseId(ds.database_id());
+ clusterConfig.SetUseSsl(ds.secure());
+ clusterConfig.SetAddBearerToToken(useBearerForYdb);
+ clusterConfig.SetClusterType(TPqClusterConfig::CT_DATA_STREAMS);
+ FillClusterAuth(clusterConfig, ds.auth(), authToken, accountIdSignatures);
+}
+
+void FillS3ClusterConfig(NYql::TS3ClusterConfig& clusterConfig,
+ const TString& name, const TString& authToken,
+ const TString& objectStorageEndpoint,
+ const THashMap<TString, TString>& accountIdSignatures,
+ const YandexQuery::ObjectStorageConnection& s3) {
+ clusterConfig.SetName(name);
+ TString objectStorageUrl;
+ if (objectStorageEndpoint == "https://s3.mds.yandex.net") {
+ objectStorageUrl = TStringBuilder() << "https://" << s3.bucket() << ".s3.mds.yandex.net/";
+ } else {
+ objectStorageUrl = TStringBuilder() << objectStorageEndpoint << '/' << s3.bucket() << '/';
+ }
+ clusterConfig.SetUrl(objectStorageUrl);
+ FillClusterAuth(clusterConfig, s3.auth(), authToken, accountIdSignatures);
+}
+
+void FillSolomonClusterConfig(NYql::TSolomonClusterConfig& clusterConfig,
+ const TString& name, const TString& authToken,
+ const THashMap<TString, TString>& accountIdSignatures,
+ const YandexQuery::Monitoring& monitoring) {
+ clusterConfig.SetName(name);
+
+ // TODO: move Endpoint to yq config
+ auto solomonEndpointForTest = GetEnv("SOLOMON_ENDPOINT");
+ auto solomonEndpoint = solomonEndpointForTest ? TString(solomonEndpointForTest) : TString();
+ if (solomonEndpoint.empty()) {
+ if (name.StartsWith("pre")) {
+ solomonEndpoint = "monitoring.api.cloud-preprod.yandex.net";
+ clusterConfig.SetUseSsl(true);
+ } else if (name.StartsWith("so")) {
+ solomonEndpoint = "solomon.yandex.net";
+ } else {
+ solomonEndpoint = "monitoring.api.cloud.yandex.net";
+ clusterConfig.SetUseSsl(true);
+ }
+ }
+
+ clusterConfig.SetCluster(solomonEndpoint);
+ clusterConfig.SetClusterType(TSolomonClusterConfig::SCT_MONITORING);
+ clusterConfig.MutablePath()->SetProject(monitoring.project());
+ clusterConfig.MutablePath()->SetCluster(monitoring.cluster());
+ FillClusterAuth(clusterConfig, monitoring.auth(), authToken, accountIdSignatures);
+}
+
} //namespace
+NYql::TPqClusterConfig CreatePqClusterConfig(const TString& name,
+ bool useBearerForYdb, const TString& authToken,
+ const TString& accountSignature, const YandexQuery::DataStreams& ds) {
+ NYql::TPqClusterConfig cluster;
+ THashMap<TString, TString> accountIdSignatures;
+ if (ds.auth().has_service_account()) {
+ accountIdSignatures[ds.auth().service_account().id()] = accountSignature;
+ }
+ FillPqClusterConfig(cluster, name, useBearerForYdb, authToken, accountIdSignatures, ds);
+ return cluster;
+}
+
+NYql::TS3ClusterConfig CreateS3ClusterConfig(const TString& name,
+ const TString& authToken, const TString& objectStorageEndpoint,
+ const TString& accountSignature, const YandexQuery::ObjectStorageConnection& s3) {
+ NYql::TS3ClusterConfig cluster;
+ THashMap<TString, TString> accountIdSignatures;
+ accountIdSignatures[s3.auth().service_account().id()] = accountSignature;
+ FillS3ClusterConfig(cluster, name, authToken, objectStorageEndpoint, accountIdSignatures, s3);
+ return cluster;
+}
+
+NYql::TSolomonClusterConfig CreateSolomonClusterConfig(const TString& name,
+ const TString& authToken, const TString& accountSignature,
+ const YandexQuery::Monitoring& monitoring) {
+ NYql::TSolomonClusterConfig cluster;
+ THashMap<TString, TString> accountIdSignatures;
+ accountIdSignatures[monitoring.auth().service_account().id()] = accountSignature;
+ FillSolomonClusterConfig(cluster, name, authToken, accountIdSignatures, monitoring);
+ return cluster;
+}
+
void AddClustersFromConnections(const THashMap<TString, YandexQuery::Connection>& connections,
bool useBearerForYdb,
const TString& objectStorageEndpoint,
@@ -73,58 +167,21 @@ void AddClustersFromConnections(const THashMap<TString, YandexQuery::Connection>
case YandexQuery::ConnectionSetting::kObjectStorage: {
const auto& s3 = conn.content().setting().object_storage();
auto* clusterCfg = gatewaysConfig.MutableS3()->AddClusterMapping();
- clusterCfg->SetName(connectionName);
- TString objectStorageUrl;
- if (objectStorageEndpoint == "https://s3.mds.yandex.net") {
- objectStorageUrl = TStringBuilder() << "https://" << s3.bucket() << ".s3.mds.yandex.net/";
- } else {
- objectStorageUrl = TStringBuilder() << objectStorageEndpoint << '/' << s3.bucket() << '/';
- }
- clusterCfg->SetUrl(objectStorageUrl);
- FillClusterAuth(*clusterCfg, s3.auth(), authToken, accountIdSignatures);
+ FillS3ClusterConfig(*clusterCfg, connectionName, authToken, objectStorageEndpoint, accountIdSignatures, s3);
clusters.emplace(connectionName, S3ProviderName);
break;
}
case YandexQuery::ConnectionSetting::kDataStreams: {
const auto& ds = conn.content().setting().data_streams();
auto* clusterCfg = gatewaysConfig.MutablePq()->AddClusterMapping();
- clusterCfg->SetName(connectionName);
- if (ds.endpoint())
- clusterCfg->SetEndpoint(ds.endpoint());
- clusterCfg->SetDatabase(ds.database());
- clusterCfg->SetDatabaseId(ds.database_id());
- clusterCfg->SetUseSsl(ds.secure());
- clusterCfg->SetAddBearerToToken(useBearerForYdb);
- clusterCfg->SetClusterType(TPqClusterConfig::CT_DATA_STREAMS);
- FillClusterAuth(*clusterCfg, ds.auth(), authToken, accountIdSignatures);
+ FillPqClusterConfig(*clusterCfg, connectionName, useBearerForYdb, authToken, accountIdSignatures, ds);
clusters.emplace(connectionName, PqProviderName);
break;
}
case YandexQuery::ConnectionSetting::kMonitoring: {
const auto& monitoring = conn.content().setting().monitoring();
auto* clusterCfg = gatewaysConfig.MutableSolomon()->AddClusterMapping();
- clusterCfg->SetName(connectionName);
-
- // TODO: move Endpoint to yq config
- auto solomonEndpointForTest = GetEnv("SOLOMON_ENDPOINT");
- auto solomonEndpoint = solomonEndpointForTest ? TString(solomonEndpointForTest) : TString();
- if (solomonEndpoint.empty()) {
- if (connectionName.StartsWith("pre")) {
- solomonEndpoint = "monitoring.api.cloud-preprod.yandex.net";
- clusterCfg->SetUseSsl(true);
- } else if (connectionName.StartsWith("so")) {
- solomonEndpoint = "solomon.yandex.net";
- } else {
- solomonEndpoint = "monitoring.api.cloud.yandex.net";
- clusterCfg->SetUseSsl(true);
- }
- }
-
- clusterCfg->SetCluster(solomonEndpoint);
- clusterCfg->SetClusterType(TSolomonClusterConfig::SCT_MONITORING);
- clusterCfg->MutablePath()->SetProject(monitoring.project());
- clusterCfg->MutablePath()->SetCluster(monitoring.cluster());
- FillClusterAuth(*clusterCfg, monitoring.auth(), authToken, accountIdSignatures);
+ FillSolomonClusterConfig(*clusterCfg, connectionName, authToken, accountIdSignatures, monitoring);
clusters.emplace(connectionName, SolomonProviderName);
break;
}
diff --git a/ydb/core/yq/libs/actors/clusters_from_connections.h b/ydb/core/yq/libs/actors/clusters_from_connections.h
index 08db9704ea..88c925d74d 100644
--- a/ydb/core/yq/libs/actors/clusters_from_connections.h
+++ b/ydb/core/yq/libs/actors/clusters_from_connections.h
@@ -5,6 +5,12 @@
namespace NYq {
+NYql::TPqClusterConfig CreatePqClusterConfig(const TString& name, bool useBearerForYdb, const TString& authToken, const TString& accountSignature, const YandexQuery::DataStreams& ds);
+
+NYql::TS3ClusterConfig CreateS3ClusterConfig(const TString& name, const TString& authToken, const TString& objectStorageEndpoint, const TString& accountSignature, const YandexQuery::ObjectStorageConnection& s3);
+
+NYql::TSolomonClusterConfig CreateSolomonClusterConfig(const TString& name, const TString& authToken, const TString& accountSignature, const YandexQuery::Monitoring& monitoring);
+
void AddClustersFromConnections(const THashMap<TString, YandexQuery::Connection>& connections,
bool useBearerForYdb,
const TString& objectStorageEndpoint,
diff --git a/ydb/core/yq/libs/actors/logging/log.h b/ydb/core/yq/libs/actors/logging/log.h
index 6fa331b986..78270e26ed 100644
--- a/ydb/core/yq/libs/actors/logging/log.h
+++ b/ydb/core/yq/libs/actors/logging/log.h
@@ -165,6 +165,16 @@
#define LOG_YQ_TEST_CONNECTION_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, YQ_TEST_CONNECTION, logRecordStream)
#define LOG_YQ_TEST_CONNECTION_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, YQ_TEST_CONNECTION, logRecordStream)
+#define LOG_YQ_TEST_CONNECTION_AS_EMERG(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, EMERG, YQ_TEST_CONNECTION, logRecordStream)
+#define LOG_YQ_TEST_CONNECTION_AS_ALERT(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, ALERT, YQ_TEST_CONNECTION, logRecordStream)
+#define LOG_YQ_TEST_CONNECTION_AS_CRIT(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, CRIT, YQ_TEST_CONNECTION, logRecordStream)
+#define LOG_YQ_TEST_CONNECTION_AS_ERROR(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, ERROR, YQ_TEST_CONNECTION, logRecordStream)
+#define LOG_YQ_TEST_CONNECTION_AS_WARN(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, WARN, YQ_TEST_CONNECTION, logRecordStream)
+#define LOG_YQ_TEST_CONNECTION_AS_NOTICE(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, NOTICE, YQ_TEST_CONNECTION, logRecordStream)
+#define LOG_YQ_TEST_CONNECTION_AS_INFO(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, INFO, YQ_TEST_CONNECTION, logRecordStream)
+#define LOG_YQ_TEST_CONNECTION_AS_DEBUG(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, DEBUG, YQ_TEST_CONNECTION, logRecordStream)
+#define LOG_YQ_TEST_CONNECTION_AS_TRACE(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, TRACE, YQ_TEST_CONNECTION, logRecordStream)
+
// Component: YQ_AUDIT.
#define LOG_YQ_AUDIT_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, YQ_AUDIT, logRecordStream)
#define LOG_YQ_AUDIT_SERVICE_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, YQ_AUDIT, logRecordStream)
diff --git a/ydb/core/yq/libs/common/util.h b/ydb/core/yq/libs/common/util.h
new file mode 100644
index 0000000000..aec53efd76
--- /dev/null
+++ b/ydb/core/yq/libs/common/util.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include <array>
+
+namespace NYq {
+
+template<std::size_t K, typename T, std::size_t N>
+auto CreateArray(const T(&list)[N]) -> std::array<T, K> {
+ static_assert(N == K, "not valid array size");
+ std::array<T, K> result;
+ std::copy(std::begin(list), std::end(list), std::begin(result));
+ return result;
+}
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt
index afa4d67a9a..6db7c58dab 100644
--- a/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt
+++ b/ydb/core/yq/libs/control_plane_storage/CMakeLists.txt
@@ -36,10 +36,12 @@ target_link_libraries(yq-libs-control_plane_storage PUBLIC
yql-public-issue
)
target_sources(yq-libs-control_plane_storage PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/config.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/control_plane_storage_counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/exceptions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/probes.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/request_validators.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/util.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/validators.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
diff --git a/ydb/core/yq/libs/control_plane_storage/config.cpp b/ydb/core/yq/libs/control_plane_storage/config.cpp
new file mode 100644
index 0000000000..afebde45a0
--- /dev/null
+++ b/ydb/core/yq/libs/control_plane_storage/config.cpp
@@ -0,0 +1,56 @@
+#include "config.h"
+#include "util.h"
+
+namespace NYq {
+
+namespace {
+
+YandexQuery::ConnectionSetting::ConnectionCase GetConnectionType(const TString& typeStr) {
+ YandexQuery::ConnectionSetting::ConnectionType type = YandexQuery::ConnectionSetting::CONNECTION_TYPE_UNSPECIFIED;
+ YandexQuery::ConnectionSetting::ConnectionType_Parse(typeStr, &type);
+ return static_cast<YandexQuery::ConnectionSetting::ConnectionCase>(type);
+}
+
+YandexQuery::BindingSetting::BindingCase GetBindingType(const TString& typeStr) {
+ YandexQuery::BindingSetting::BindingType type = YandexQuery::BindingSetting::BINDING_TYPE_UNSPECIFIED;
+ YandexQuery::BindingSetting::BindingType_Parse(typeStr, &type);
+ return static_cast<YandexQuery::BindingSetting::BindingCase>(type);
+}
+
+}
+
+TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common)
+ : Proto(FillDefaultParameters(config))
+ , IdsPrefix(common.GetIdsPrefix())
+ , IdempotencyKeyTtl(GetDuration(Proto.GetIdempotencyKeysTtl(), TDuration::Minutes(10)))
+ , AutomaticQueriesTtl(GetDuration(Proto.GetAutomaticQueriesTtl(), TDuration::Days(1)))
+ , ResultSetsTtl(GetDuration(Proto.GetResultSetsTtl(), TDuration::Days(1)))
+ , AnalyticsRetryCounterUpdateTime(GetDuration(Proto.GetAnalyticsRetryCounterUpdateTime(), TDuration::Days(1)))
+ , StreamingRetryCounterUpdateTime(GetDuration(Proto.GetAnalyticsRetryCounterUpdateTime(), TDuration::Days(1)))
+ , TaskLeaseTtl(GetDuration(Proto.GetTaskLeaseTtl(), TDuration::Seconds(30)))
+{
+ for (const auto& availableConnection : Proto.GetAvailableConnection()) {
+ AvailableConnections.insert(GetConnectionType(availableConnection));
+ }
+
+ for (const auto& availableBinding : Proto.GetAvailableBinding()) {
+ AvailableBindings.insert(GetBindingType(availableBinding));
+ }
+
+ for (const auto& mapping : Proto.GetRetryPolicyMapping()) {
+ auto& retryPolicy = mapping.GetPolicy();
+ auto retryCount = retryPolicy.GetRetryCount();
+ auto retryPeriod = GetDuration(retryPolicy.GetRetryPeriod(), TDuration::Hours(1));
+ auto backoffPeriod = GetDuration(retryPolicy.GetBackoffPeriod(), TDuration::Zero());
+ for (const auto statusCode: mapping.GetStatusCode()) {
+ RetryPolicies.emplace(statusCode, TRetryPolicyItem(retryCount, retryPeriod, backoffPeriod));
+ }
+ }
+
+ if (Proto.HasTaskLeaseRetryPolicy()) {
+ TaskLeaseRetryPolicy.RetryCount = Proto.GetTaskLeaseRetryPolicy().GetRetryCount();
+ TaskLeaseRetryPolicy.RetryPeriod = GetDuration(Proto.GetTaskLeaseRetryPolicy().GetRetryPeriod(), TDuration::Days(1));
+ }
+}
+
+} // NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/config.h b/ydb/core/yq/libs/control_plane_storage/config.h
new file mode 100644
index 0000000000..bd637932ee
--- /dev/null
+++ b/ydb/core/yq/libs/control_plane_storage/config.h
@@ -0,0 +1,32 @@
+#pragma once
+
+#include "util.h"
+
+#include <ydb/core/yq/libs/config/protos/common.pb.h>
+#include <ydb/core/yq/libs/config/protos/control_plane_storage.pb.h>
+#include <ydb/public/api/protos/yq.pb.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/hash.h>
+#include <util/generic/set.h>
+
+namespace NYq {
+
+struct TControlPlaneStorageConfig {
+ NConfig::TControlPlaneStorageConfig Proto;
+ TString IdsPrefix;
+ TDuration IdempotencyKeyTtl;
+ TDuration AutomaticQueriesTtl;
+ TDuration ResultSetsTtl;
+ TDuration AnalyticsRetryCounterUpdateTime;
+ TDuration StreamingRetryCounterUpdateTime;
+ TDuration TaskLeaseTtl;
+ TSet<YandexQuery::ConnectionSetting::ConnectionCase> AvailableConnections;
+ TSet<YandexQuery::BindingSetting::BindingCase> AvailableBindings;
+ THashMap<ui64, TRetryPolicyItem> RetryPolicies;
+ TRetryPolicyItem TaskLeaseRetryPolicy;
+
+ TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common);
+};
+
+} // NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
index 74616c9caf..5d33c45bc3 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
@@ -99,7 +99,6 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
TDuration backoff = taskLeaseTtl;
if (request->ResignQuery) {
-
TRetryPolicyItem policy(0, TDuration::Seconds(1), TDuration::Zero());
auto it = retryPolicies.find(request->StatusCode);
auto policyFound = it != retryPolicies.end();
diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.cpp b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp
new file mode 100644
index 0000000000..861d90e7d9
--- /dev/null
+++ b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp
@@ -0,0 +1,108 @@
+#include "request_validators.h"
+
+namespace NYq {
+
+NYql::TIssues ValidateConnectionSetting(const YandexQuery::ConnectionSetting& setting, const TSet<YandexQuery::ConnectionSetting::ConnectionCase>& availableConnections, bool disableCurrentIam, bool clickHousePasswordRequire) {
+ NYql::TIssues issues;
+ if (!availableConnections.contains(setting.connection_case())) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "connection of the specified type is disabled"));
+ }
+
+ switch (setting.connection_case()) {
+ case YandexQuery::ConnectionSetting::kYdbDatabase: {
+ const YandexQuery::YdbDatabase database = setting.ydb_database();
+ if (!database.has_auth() || database.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.ydb_database.auth field is not specified"));
+ }
+
+ if (database.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && disableCurrentIam) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
+ }
+
+ if (!database.database_id() && !(database.endpoint() && database.database())) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.ydb_database.{database_id or database,endpoint} field is not specified"));
+ }
+ break;
+ }
+ case YandexQuery::ConnectionSetting::kClickhouseCluster: {
+ const YandexQuery::ClickHouseCluster ch = setting.clickhouse_cluster();
+ if (!ch.has_auth() || ch.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.auth field is not specified"));
+ }
+
+ if (ch.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && disableCurrentIam) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
+ }
+
+ if (!ch.database_id() && !(ch.host() && ch.port())) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.{database_id or host,port} field is not specified"));
+ }
+
+ if (!ch.login()) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.login field is not specified"));
+ }
+
+ if (!ch.password() && clickHousePasswordRequire) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.password field is not specified"));
+ }
+ break;
+ }
+ case YandexQuery::ConnectionSetting::kObjectStorage: {
+ const YandexQuery::ObjectStorageConnection objectStorage = setting.object_storage();
+ if (!objectStorage.has_auth() || objectStorage.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.object_storage.auth field is not specified"));
+ }
+
+ if (objectStorage.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && disableCurrentIam) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
+ }
+
+ if (!objectStorage.bucket()) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.object_storage.bucket field is not specified"));
+ }
+ break;
+ }
+ case YandexQuery::ConnectionSetting::kDataStreams: {
+ const YandexQuery::DataStreams dataStreams = setting.data_streams();
+ if (!dataStreams.has_auth() || dataStreams.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.data_streams.auth field is not specified"));
+ }
+
+ if (dataStreams.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && disableCurrentIam) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
+ }
+
+ if (!dataStreams.database_id() && !(dataStreams.endpoint() && dataStreams.database())) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.data_streams.{database_id or database,endpoint} field is not specified"));
+ }
+ break;
+ }
+ case YandexQuery::ConnectionSetting::kMonitoring: {
+ const YandexQuery::Monitoring monitoring = setting.monitoring();
+ if (!monitoring.has_auth() || monitoring.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.monitoring.auth field is not specified"));
+ }
+
+ if (monitoring.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && disableCurrentIam) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
+ }
+
+ if (!monitoring.project()) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.monitoring.project field is not specified"));
+ }
+
+ if (!monitoring.cluster()) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.monitoring.cluster field is not specified"));
+ }
+ break;
+ }
+ case YandexQuery::ConnectionSetting::CONNECTION_NOT_SET: {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "connection is not set"));
+ break;
+ }
+ // Do not add default. Adding a new connection should cause a compilation error
+ }
+ return issues;
+}
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.h b/ydb/core/yq/libs/control_plane_storage/request_validators.h
new file mode 100644
index 0000000000..0e91b2b03f
--- /dev/null
+++ b/ydb/core/yq/libs/control_plane_storage/request_validators.h
@@ -0,0 +1,152 @@
+#pragma once
+
+#include <ydb/core/yq/libs/config/yq_issue.h>
+#include <ydb/library/yql/public/issue/yql_issue.h>
+#include <ydb/public/api/protos/yq.pb.h>
+
+#include <util/generic/fwd.h>
+#include <util/generic/set.h>
+#include <util/string/builder.h>
+#include <util/string/cast.h>
+
+namespace NYq {
+
+template<class P>
+NYql::TIssues ValidateEvent(P& ev, size_t maxSize)
+{
+ const auto& request = ev->Get()->Request;
+ const TString& scope = ev->Get()->Scope;
+ const TString& user = ev->Get()->User;
+ const TString& token = ev->Get()->Token;
+ const size_t byteSize = request.ByteSizeLong();
+
+ NYql::TIssues issues;
+ if (!scope) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "scope is not specified"));
+ }
+
+ if (!user) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "user is empty"));
+ }
+
+ if (!token) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::ACCESS_DENIED, "token is empty"));
+ }
+
+ if (byteSize > maxSize) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, TStringBuilder{} << "request size exceeded " << maxSize << " bytes. Request size: " << byteSize));
+ }
+
+ TString error;
+ if (!request.validate(error)) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, error));
+ }
+
+ return issues;
+}
+
+template<typename T>
+NYql::TIssues ValidateQuery(T& ev, size_t maxSize)
+{
+ NYql::TIssues issues = ValidateEvent(ev, maxSize);
+ auto& request = ev->Get()->Request;
+ const auto& content = request.content();
+
+ if (request.execute_mode() == YandexQuery::ExecuteMode::EXECUTE_MODE_UNSPECIFIED) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "execute_mode field is not specified"));
+ }
+
+ if (content.type() == YandexQuery::QueryContent::QUERY_TYPE_UNSPECIFIED) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "type field is not specified"));
+ }
+
+ if (content.acl().visibility() == YandexQuery::Acl::VISIBILITY_UNSPECIFIED) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "acl.visibility field is not specified"));
+ }
+
+ if (content.type() == YandexQuery::QueryContent::STREAMING && !request.has_disposition()) {
+ request.mutable_disposition()->mutable_fresh();
+ }
+
+ return issues;
+}
+
+template<typename T>
+ NYql::TIssues ValidateBinding(T& ev, size_t maxSize, const TSet<YandexQuery::BindingSetting::BindingCase>& availableBindings)
+{
+ const auto& request = ev->Get()->Request;
+ NYql::TIssues issues = ValidateEvent(ev, maxSize);
+
+ if (request.has_content()) {
+ const YandexQuery::BindingContent& content = request.content();
+ if (content.acl().visibility() == YandexQuery::Acl::VISIBILITY_UNSPECIFIED) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding.acl.visibility field is not specified"));
+ }
+
+ if (content.name() != to_lower(content.name())) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, TStringBuilder{} << "Incorrect binding name: " << content.name() << ". Please use only lower case"));
+ }
+
+ if (!content.has_setting()) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding.setting field is not specified"));
+ }
+
+ const YandexQuery::BindingSetting& setting = content.setting();
+ if (!availableBindings.contains(setting.binding_case())) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding of the specified type is disabled"));
+ }
+
+ switch (setting.binding_case()) {
+ case YandexQuery::BindingSetting::kDataStreams: {
+ const YandexQuery::DataStreamsBinding dataStreams = setting.data_streams();
+ if (!dataStreams.has_schema()) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "data streams with empty schema is forbidden"));
+ }
+ break;
+ }
+ case YandexQuery::BindingSetting::BINDING_NOT_SET: {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding is not set"));
+ break;
+ }
+ // Do not replace with default. Adding a new binding should cause a compilation error
+ case YandexQuery::BindingSetting::kObjectStorage:
+ break;
+ }
+ } else {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content field is not specified"));
+ }
+
+ return issues;
+}
+
+NYql::TIssues ValidateConnectionSetting(const YandexQuery::ConnectionSetting& setting, const TSet<YandexQuery::ConnectionSetting::ConnectionCase>& availableConnections, bool disableCurrentIam, bool clickHousePasswordRequire = true);
+
+template<typename T>
+NYql::TIssues ValidateConnection(T& ev, size_t maxSize, const TSet<YandexQuery::ConnectionSetting::ConnectionCase>& availableConnections, bool disableCurrentIam, bool clickHousePasswordRequire = true)
+{
+ const auto& request = ev->Get()->Request;
+ NYql::TIssues issues = ValidateEvent(ev, maxSize);
+
+ if (!request.has_content()) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content field is not specified"));
+ }
+
+ const YandexQuery::ConnectionContent& content = request.content();
+ if (content.acl().visibility() == YandexQuery::Acl::VISIBILITY_UNSPECIFIED) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.acl.visibility field is not specified"));
+ }
+
+ if (content.name() != to_lower(content.name())) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, TStringBuilder{} << "Incorrect connection name: " << content.name() << ". Please use only lower case"));
+ }
+
+ if (!content.has_setting()) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting field is not specified"));
+ }
+
+ const YandexQuery::ConnectionSetting& setting = content.setting();
+ issues.AddIssues(ValidateConnectionSetting(setting, availableConnections, disableCurrentIam, clickHousePasswordRequire));
+ return issues;
+}
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/util.h b/ydb/core/yq/libs/control_plane_storage/util.h
index 27ada7b03a..9cf5dfe656 100644
--- a/ydb/core/yq/libs/control_plane_storage/util.h
+++ b/ydb/core/yq/libs/control_plane_storage/util.h
@@ -1,12 +1,12 @@
#pragma once
+#include <ydb/core/yq/libs/common/util.h>
+#include <ydb/core/yq/libs/config/protos/control_plane_storage.pb.h>
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
-#include <util/datetime/base.h>
-
-#include <contrib/libs/protobuf/src/google/protobuf/timestamp.pb.h>
+#include <google/protobuf/timestamp.pb.h>
-#include <ydb/core/yq/libs/config/protos/control_plane_storage.pb.h>
+#include <util/datetime/base.h>
namespace NYq {
@@ -38,14 +38,6 @@ TDuration GetDuration(const TString& value, const TDuration& defaultValue);
NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlaneStorageConfig config);
-template<std::size_t K, typename T, std::size_t N>
-auto CreateArray(const T(&list)[N]) -> std::array<T, K> {
- static_assert(N == K, "not valid array size");
- std::array<T, K> result;
- std::copy(std::begin(list), std::end(list), std::begin(result));
- return result;
-}
-
bool DoesPingTaskUpdateQueriesTable(const TEvControlPlaneStorage::TEvPingTaskRequest* request);
NYdb::TValue PackItemsToList(const TVector<NYdb::TValue>& items);
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index acc2fc3543..c1983795b3 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -21,18 +21,6 @@ void CollectDebugInfo(const TString& query, const TParams& params, TSession sess
}
}
-inline YandexQuery::ConnectionSetting::ConnectionCase GetConnectionType(const TString& typeStr) {
- YandexQuery::ConnectionSetting::ConnectionType type = YandexQuery::ConnectionSetting::CONNECTION_TYPE_UNSPECIFIED;
- YandexQuery::ConnectionSetting::ConnectionType_Parse(typeStr, &type);
- return static_cast<YandexQuery::ConnectionSetting::ConnectionCase>(type);
-}
-
-inline YandexQuery::BindingSetting::BindingCase GetBindingType(const TString& typeStr) {
- YandexQuery::BindingSetting::BindingType type = YandexQuery::BindingSetting::BINDING_TYPE_UNSPECIFIED;
- YandexQuery::BindingSetting::BindingType_Parse(typeStr, &type);
- return static_cast<YandexQuery::BindingSetting::BindingCase>(type);
-}
-
ERetryErrorClass RetryFunc(const NYdb::TStatus& status) {
return status.GetStatus() == NYdb::EStatus::OVERLOADED ? ERetryErrorClass::LongRetry : ERetryErrorClass::ShortRetry;
}
@@ -61,41 +49,6 @@ void TYdbControlPlaneStorageActor::Bootstrap() {
Become(&TThis::StateFunc);
}
-TYdbControlPlaneStorageActor::TConfig::TConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common)
- : Proto(FillDefaultParameters(config))
- , IdsPrefix(common.GetIdsPrefix())
- , IdempotencyKeyTtl(GetDuration(Proto.GetIdempotencyKeysTtl(), TDuration::Minutes(10)))
- , AutomaticQueriesTtl(GetDuration(Proto.GetAutomaticQueriesTtl(), TDuration::Days(1)))
- , ResultSetsTtl(GetDuration(Proto.GetResultSetsTtl(), TDuration::Days(1)))
- , TaskLeaseTtl(GetDuration(Proto.GetTaskLeaseTtl(), TDuration::Seconds(30)))
-{
- for (const auto& availableConnection: Proto.GetAvailableConnection()) {
- AvailableConnections.insert(GetConnectionType(availableConnection));
- }
-
- for (const auto& availableBinding: Proto.GetAvailableBinding()) {
- AvailableBindings.insert(GetBindingType(availableBinding));
- }
-
- for (const auto& mapping: Proto.GetRetryPolicyMapping()) {
- auto& retryPolicy = mapping.GetPolicy();
- auto retryCount = retryPolicy.GetRetryCount();
- auto retryPeriod = GetDuration(retryPolicy.GetRetryPeriod(), TDuration::Hours(1));
- auto backoffPeriod = GetDuration(retryPolicy.GetBackoffPeriod(), TDuration::Zero());
- for (const auto statusCode: mapping.GetStatusCode()) {
- RetryPolicies.emplace(statusCode, TRetryPolicyItem(retryCount, retryPeriod, backoffPeriod));
- }
- }
-
- if (Proto.HasTaskLeaseRetryPolicy()) {
- TaskLeaseRetryPolicy.RetryCount = Proto.GetTaskLeaseRetryPolicy().GetRetryCount();
- TaskLeaseRetryPolicy.RetryPeriod = GetDuration(Proto.GetTaskLeaseRetryPolicy().GetRetryPeriod(), TDuration::Days(1));
- } else {
- TaskLeaseRetryPolicy.RetryCount = 20;
- TaskLeaseRetryPolicy.RetryPeriod = TDuration::Days(1);
- }
-}
-
/*
* Creating tables
*/
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
index 55e88913bb..ac2c03c73e 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
@@ -1,13 +1,15 @@
#pragma once
+#include "config.h"
#include "control_plane_storage.h"
#include "control_plane_storage_counters.h"
#include "exceptions.h"
#include "extractors.h"
-#include <ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h>
#include "probes.h"
+#include "request_validators.h"
#include "util.h"
#include "validators.h"
+#include <ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h>
#include <util/generic/guid.h>
#include <util/system/yassert.h>
@@ -217,25 +219,9 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
}
};
-private:
- struct TConfig {
- NConfig::TControlPlaneStorageConfig Proto;
- TString IdsPrefix;
- TDuration IdempotencyKeyTtl;
- TDuration AutomaticQueriesTtl;
- TDuration ResultSetsTtl;
- TDuration TaskLeaseTtl;
- TSet<YandexQuery::ConnectionSetting::ConnectionCase> AvailableConnections;
- TSet<YandexQuery::BindingSetting::BindingCase> AvailableBindings;
- THashMap<ui64, TRetryPolicyItem> RetryPolicies;
- TRetryPolicyItem TaskLeaseRetryPolicy;
-
- TConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common);
- };
-
TCounters Counters;
- TConfig Config;
+ ::NYq::TControlPlaneStorageConfig Config;
TYdbConnectionPtr YdbConnection;
@@ -327,174 +313,15 @@ public:
template<typename T>
NYql::TIssues ValidateConnection(T& ev, bool clickHousePasswordRequire = true)
{
- const auto& request = ev->Get()->Request;
- NYql::TIssues issues = ValidateEvent(ev);
-
- if (!request.has_content()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content field is not specified"));
- }
-
- const YandexQuery::ConnectionContent& content = request.content();
- if (content.acl().visibility() == YandexQuery::Acl::VISIBILITY_UNSPECIFIED) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.acl.visibility field is not specified"));
- }
-
- if (content.name() != to_lower(content.name())) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Incorrect connection name: " + content.name() + ". Please use only lower case"));
- }
-
- if (!content.has_setting()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting field is not specified"));
- }
-
- const YandexQuery::ConnectionSetting& setting = content.setting();
- if (!Config.AvailableConnections.contains(setting.connection_case())) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "connection of the specified type is disabled"));
- }
-
- switch (setting.connection_case()) {
- case YandexQuery::ConnectionSetting::kYdbDatabase: {
- const YandexQuery::YdbDatabase database = setting.ydb_database();
- if (!database.has_auth() || database.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.ydb_database.auth field is not specified"));
- }
-
- if (database.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && Config.Proto.GetDisableCurrentIam()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
- }
-
- if (!database.database_id() && !(database.endpoint() && database.database())) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.ydb_database.{database_id||database,endpoint} field is not specified"));
- }
- break;
- }
- case YandexQuery::ConnectionSetting::kClickhouseCluster: {
- const YandexQuery::ClickHouseCluster ch = setting.clickhouse_cluster();
- if (!ch.has_auth() || ch.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.auth field is not specified"));
- }
-
- if (ch.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && Config.Proto.GetDisableCurrentIam()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
- }
-
- if (!ch.database_id() && !(ch.host() && ch.port())) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.{database_id||host,port} field is not specified"));
- }
-
- if (!ch.login()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.login field is not specified"));
- }
-
- if (!ch.password() && clickHousePasswordRequire) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.clickhouse_cluster.password field is not specified"));
- }
- break;
- }
- case YandexQuery::ConnectionSetting::kObjectStorage: {
- const YandexQuery::ObjectStorageConnection objectStorage = setting.object_storage();
- if (!objectStorage.has_auth() || objectStorage.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.object_storage.auth field is not specified"));
- }
-
- if (objectStorage.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && Config.Proto.GetDisableCurrentIam()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
- }
-
- if (!objectStorage.bucket()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.object_storage.bucket field is not specified"));
- }
- break;
- }
- case YandexQuery::ConnectionSetting::kDataStreams: {
- const YandexQuery::DataStreams dataStreams = setting.data_streams();
- if (!dataStreams.has_auth() || dataStreams.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.data_streams.auth field is not specified"));
- }
-
- if (dataStreams.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && Config.Proto.GetDisableCurrentIam()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
- }
-
- if (!dataStreams.database_id() && !(dataStreams.endpoint() && dataStreams.database())) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.data_streams.{database_id||database,endpoint} field is not specified"));
- }
- break;
- }
- case YandexQuery::ConnectionSetting::kMonitoring: {
- const YandexQuery::Monitoring monitoring = setting.monitoring();
- if (!monitoring.has_auth() || monitoring.auth().identity_case() == YandexQuery::IamAuth::IDENTITY_NOT_SET) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.monitoring.auth field is not specified"));
- }
-
- if (monitoring.auth().identity_case() == YandexQuery::IamAuth::kCurrentIam && Config.Proto.GetDisableCurrentIam()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
- }
-
- if (!monitoring.project()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.monitoring.project field is not specified"));
- }
-
- if (!monitoring.cluster()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.monitoring.cluster field is not specified"));
- }
- break;
- }
- case YandexQuery::ConnectionSetting::CONNECTION_NOT_SET: {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "connection is not set"));
- break;
- }
- // Do not add default. Adding a new connection should cause a compilation error
- }
- return issues;
+ return ::NYq::ValidateConnection<T>(ev, Config.Proto.GetMaxRequestSize(),
+ Config.AvailableConnections, Config.Proto.GetDisableCurrentIam(),
+ clickHousePasswordRequire);
}
template<typename T>
NYql::TIssues ValidateBinding(T& ev)
{
- const auto& request = ev->Get()->Request;
- NYql::TIssues issues = ValidateEvent(ev);
-
- if (request.has_content()) {
- const YandexQuery::BindingContent& content = request.content();
- if (content.acl().visibility() == YandexQuery::Acl::VISIBILITY_UNSPECIFIED) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding.acl.visibility field is not specified"));
- }
-
- if (content.name() != to_lower(content.name())) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Incorrect binding name: " + content.name() + ". Please use only lower case"));
- }
-
- if (!content.has_setting()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding.setting field is not specified"));
- }
-
- const YandexQuery::BindingSetting& setting = content.setting();
- if (!Config.AvailableBindings.contains(setting.binding_case())) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding of the specified type is disabled"));
- }
-
- switch (setting.binding_case()) {
- case YandexQuery::BindingSetting::kDataStreams: {
- const YandexQuery::DataStreamsBinding dataStreams = setting.data_streams();
- if (!dataStreams.has_schema()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "data streams with empty schema is forbidden"));
- }
- break;
- }
- case YandexQuery::BindingSetting::BINDING_NOT_SET: {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding is not set"));
- break;
- }
- // Do not replace with default. Adding a new binding should cause a compilation error
- case YandexQuery::BindingSetting::kObjectStorage:
- break;
- }
- } else {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "binding field is not specified"));
- }
-
- return issues;
+ return ::NYq::ValidateBinding<T>(ev, Config.Proto.GetMaxRequestSize(), Config.AvailableBindings);
}
void Handle(NMon::TEvHttpInfo::TPtr& ev) {
@@ -512,61 +339,13 @@ public:
template<typename T>
NYql::TIssues ValidateQuery(T& ev)
{
- NYql::TIssues issues = ValidateEvent(ev);
- auto& request = ev->Get()->Request;
- const auto& content = request.content();
-
- if (request.execute_mode() == YandexQuery::ExecuteMode::EXECUTE_MODE_UNSPECIFIED) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "mode field is not specified"));
- }
-
- if (content.type() == YandexQuery::QueryContent::QUERY_TYPE_UNSPECIFIED) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "type field is not specified"));
- }
-
- if (content.acl().visibility() == YandexQuery::Acl::VISIBILITY_UNSPECIFIED) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "acl.visibility field is not specified"));
- }
-
- if (content.type() == YandexQuery::QueryContent::STREAMING && !request.has_disposition()) {
- request.mutable_disposition()->mutable_fresh();
- }
-
- return issues;
+ return ::NYq::ValidateQuery<T>(ev, Config.Proto.GetMaxRequestSize());
}
template<class P>
NYql::TIssues ValidateEvent(P& ev)
{
- const auto& request = ev->Get()->Request;
- const TString scope = ev->Get()->Scope;
- const TString user = ev->Get()->User;
- const TString token = ev->Get()->Token;
- const int byteSize = request.ByteSize();
-
- NYql::TIssues issues;
- if (!scope) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "scope is not specified"));
- }
-
- if (!user) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "user is empty"));
- }
-
- if (!token) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "token is empty"));
- }
-
- if (byteSize > static_cast<int>(Config.Proto.GetMaxRequestSize())) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "request size exceeded " + ToString(request.ByteSize()) + " out of " + ToString(Config.Proto.GetMaxRequestSize()) + " bytes"));
- }
-
- TString error;
- if (!request.validate(error)) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, error));
- }
-
- return issues;
+ return ::NYq::ValidateEvent<P>(ev, Config.Proto.GetMaxRequestSize());
}
/*
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 7946e71e8c..348a474997 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -75,13 +75,6 @@ void Init(
actorRegistrator(NYq::ControlPlaneStorageServiceActorId(), controlPlaneStorage);
}
- if (protoConfig.GetTestConnection().GetEnabled()) {
- auto testConnection = NYq::CreateTestConnectionActor(
- protoConfig.GetTestConnection(),
- appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "TestConnection"));
- actorRegistrator(NYq::TestConnectionActorId(), testConnection);
- }
-
if (protoConfig.GetControlPlaneProxy().GetEnabled()) {
auto controlPlaneProxy = NYq::CreateControlPlaneProxyActor(protoConfig.GetControlPlaneProxy(),
appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "ControlPlaneProxy"));
@@ -135,6 +128,9 @@ void Init(
}
credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent);
+ }
+
+ if (protoConfig.GetPrivateApi().GetEnabled()) {
RegisterDqPqReadActorFactory(*sourceActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
RegisterYdbReadActorFactory(*sourceActorFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterS3ReadActorFactory(*sourceActorFactory, credentialsFactory,
@@ -205,6 +201,21 @@ void Init(
auto httpProxy = NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance());
actorRegistrator(MakeYqlAnalyticsHttpProxyId(), httpProxy);
+ if (protoConfig.GetTestConnection().GetEnabled()) {
+ auto testConnection = NYq::CreateTestConnectionActor(
+ protoConfig.GetTestConnection(),
+ protoConfig.GetControlPlaneStorage(),
+ protoConfig.GetCommon(),
+ protoConfig.GetTokenAccessor(),
+ yqSharedResources,
+ credentialsFactory,
+ pqCmConnections,
+ appData->FunctionRegistry,
+ httpGateway,
+ appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "TestConnection"));
+ actorRegistrator(NYq::TestConnectionActorId(), testConnection);
+ }
+
if (protoConfig.GetPendingFetcher().GetEnabled()) {
auto fetcher = CreatePendingFetcher(
yqSharedResources,
diff --git a/ydb/core/yq/libs/test_connection/CMakeLists.txt b/ydb/core/yq/libs/test_connection/CMakeLists.txt
index 6c4b4aef64..adf2dec96f 100644
--- a/ydb/core/yq/libs/test_connection/CMakeLists.txt
+++ b/ydb/core/yq/libs/test_connection/CMakeLists.txt
@@ -15,11 +15,18 @@ target_link_libraries(yq-libs-test_connection PUBLIC
contrib-libs-cxxsupp
yutil
library-cpp-lwtrace
+ cpp-xml-document
libs-actors-logging
libs-config-protos
+ yq-libs-control_plane_storage
libs-test_connection-events
+ providers-solomon-async_io
)
target_sources(yq-libs-test_connection PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/test_connection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/probes.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/test_connection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/test_data_streams.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/test_monitoring.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/test_connection/test_object_storage.cpp
)
diff --git a/ydb/core/yq/libs/test_connection/counters.cpp b/ydb/core/yq/libs/test_connection/counters.cpp
new file mode 100644
index 0000000000..92bd0d77bd
--- /dev/null
+++ b/ydb/core/yq/libs/test_connection/counters.cpp
@@ -0,0 +1,21 @@
+#include "counters.h"
+
+namespace NYq {
+
+TTestConnectionRequestCounters::TTestConnectionRequestCounters(const TString& name)
+ : Name(name)
+{}
+
+void TTestConnectionRequestCounters::Register(const NMonitoring::TDynamicCounterPtr& counters) {
+ auto requestCounters = counters->GetSubgroup("request", Name);
+ InFly = requestCounters->GetCounter("InFly", false);
+ Ok = requestCounters->GetCounter("Ok", true);
+ Error = requestCounters->GetCounter("Error", true);
+ LatencyMs = requestCounters->GetHistogram("LatencyMs", GetLatencyHistogramBuckets());
+}
+
+NMonitoring::IHistogramCollectorPtr TTestConnectionRequestCounters::GetLatencyHistogramBuckets() {
+ return NMonitoring::ExplicitHistogram({0, 1, 2, 5, 10, 20, 50, 100, 500, 1000, 2000, 5000, 10000, 30000, 50000, 500000});
+}
+
+} // NYq
diff --git a/ydb/core/yq/libs/test_connection/counters.h b/ydb/core/yq/libs/test_connection/counters.h
new file mode 100644
index 0000000000..9d0dc4b38b
--- /dev/null
+++ b/ydb/core/yq/libs/test_connection/counters.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+namespace NYq {
+
+class TTestConnectionRequestCounters: public virtual TThrRefBase {
+public:
+ const TString Name;
+ NMonitoring::TDynamicCounters::TCounterPtr InFly;
+ NMonitoring::TDynamicCounters::TCounterPtr Ok;
+ NMonitoring::TDynamicCounters::TCounterPtr Error;
+ NMonitoring::THistogramPtr LatencyMs;
+
+ explicit TTestConnectionRequestCounters(const TString& name);
+
+ void Register(const NMonitoring::TDynamicCounterPtr& counters);
+
+private:
+ static NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets();
+};
+
+using TTestConnectionRequestCountersPtr = TIntrusivePtr<TTestConnectionRequestCounters>;
+
+} // NYq
diff --git a/ydb/core/yq/libs/test_connection/probes.h b/ydb/core/yq/libs/test_connection/probes.h
index b1f1d51abc..133b18160d 100644
--- a/ydb/core/yq/libs/test_connection/probes.h
+++ b/ydb/core/yq/libs/test_connection/probes.h
@@ -3,10 +3,22 @@
#include <library/cpp/lwtrace/all.h>
#define YQ_TEST_CONNECTION_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \
- PROBE(TestConnectionRequest, \
+ PROBE(TestDataStreamsConnectionRequest, \
GROUPS(), \
- TYPES(TString, TString, TDuration, i64, bool, bool), \
- NAMES("scope", "user", "latencyMs", "size", "success", "timeout")) \
+ TYPES(TString, TString, TDuration, bool), \
+ NAMES("scope", "user", "latencyMs", "success")) \
+ PROBE(TestMonitoringConnectionRequest, \
+ GROUPS(), \
+ TYPES(TString, TString, TDuration, bool), \
+ NAMES("scope", "user", "latencyMs", "success")) \
+ PROBE(TestObjectStorageConnectionRequest, \
+ GROUPS(), \
+ TYPES(TString, TString, TDuration, bool), \
+ NAMES("scope", "user", "latencyMs", "success")) \
+ PROBE(TestUnsupportedConnectionRequest, \
+ GROUPS(), \
+ TYPES(TString, TString), \
+ NAMES("scope", "user")) \
// YQ_TEST_CONNECTION_PROVIDER
diff --git a/ydb/core/yq/libs/test_connection/request_validators.h b/ydb/core/yq/libs/test_connection/request_validators.h
new file mode 100644
index 0000000000..ec52cbb2cf
--- /dev/null
+++ b/ydb/core/yq/libs/test_connection/request_validators.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include <ydb/core/yq/libs/control_plane_storage/request_validators.h>
+
+namespace NYq {
+
+template<typename T>
+NYql::TIssues ValidateTestConnection(T& ev, size_t maxSize, const TSet<YandexQuery::ConnectionSetting::ConnectionCase>& availableConnections, bool disableCurrentIam, bool clickHousePasswordRequire = true)
+{
+ const auto& request = ev->Get()->Request;
+ NYql::TIssues issues = ValidateEvent(ev, maxSize);
+
+ if (!request.has_setting()) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting field is not specified"));
+ }
+
+ const YandexQuery::ConnectionSetting& setting = request.setting();
+ issues.AddIssues(ValidateConnectionSetting(setting, availableConnections, disableCurrentIam, clickHousePasswordRequire));
+ return issues;
+}
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/test_connection/test_connection.cpp b/ydb/core/yq/libs/test_connection/test_connection.cpp
index b7148d0dbc..318dd6552f 100644
--- a/ydb/core/yq/libs/test_connection/test_connection.cpp
+++ b/ydb/core/yq/libs/test_connection/test_connection.cpp
@@ -1,27 +1,137 @@
#include "events/events.h"
#include "probes.h"
#include "test_connection.h"
+#include "request_validators.h"
+
+#include <ydb/core/mon/mon.h>
+#include <ydb/core/yq/libs/actors/database_resolver.h>
+#include <ydb/core/yq/libs/actors/proxy.h>
+#include <ydb/core/yq/libs/common/util.h>
+#include <ydb/core/yq/libs/config/yq_issue.h>
+#include <ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
+#include <ydb/core/yq/libs/control_plane_storage/config.h>
+
+#include <ydb/library/security/util.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
#include <library/cpp/monlib/service/pages/templates.h>
-#include <ydb/core/mon/mon.h>
-#include <ydb/core/yq/libs/config/yq_issue.h>
-
namespace NYq {
+LWTRACE_USING(YQ_TEST_CONNECTION_PROVIDER);
+
using namespace NActors;
class TTestConnectionActor : public NActors::TActorBootstrapped<TTestConnectionActor> {
- NMonitoring::TDynamicCounterPtr Counters;
+
+ enum ERequestTypeScope {
+ RTS_TEST_DATA_STREAMS_CONNECTION,
+ RTS_TEST_MONITORING_CONNECTION,
+ RTS_TEST_OBJECT_STORAGE_CONNECTION,
+ RTS_TEST_UNSUPPORTED_CONNECTION,
+ RTS_MAX,
+ };
+
+ class TCounters: public virtual TThrRefBase {
+ struct TMetricsScope {
+ TString CloudId;
+ TString Scope;
+
+ bool operator<(const TMetricsScope& right) const {
+ return std::tie(CloudId, Scope) < std::tie(right.CloudId, right.Scope);
+ }
+ };
+
+ using TScopeCounters = std::array<TTestConnectionRequestCountersPtr, RTS_MAX>;
+ using TScopeCountersPtr = std::shared_ptr<TScopeCounters>;
+
+ TMap<TMetricsScope, TScopeCountersPtr> ScopeCounters;
+ NMonitoring::TDynamicCounterPtr Counters;
+
+ ERequestTypeScope ToType(YandexQuery::ConnectionSetting::ConnectionCase connectionCase) {
+ switch (connectionCase) {
+ case YandexQuery::ConnectionSetting::kDataStreams:
+ return RTS_TEST_DATA_STREAMS_CONNECTION;
+ case YandexQuery::ConnectionSetting::kObjectStorage:
+ return RTS_TEST_OBJECT_STORAGE_CONNECTION;
+ case YandexQuery::ConnectionSetting::kMonitoring:
+ return RTS_TEST_MONITORING_CONNECTION;
+ default:
+ return RTS_TEST_UNSUPPORTED_CONNECTION;
+ }
+ }
+
+ public:
+ explicit TCounters(const NMonitoring::TDynamicCounterPtr& counters)
+ : Counters(counters)
+ {}
+
+ TTestConnectionRequestCountersPtr GetScopeCounters(const TString& cloudId, const TString& scope, YandexQuery::ConnectionSetting::ConnectionCase connectionCase) {
+ ERequestTypeScope type = ToType(connectionCase);
+ TMetricsScope key{cloudId, scope};
+ auto it = ScopeCounters.find(key);
+ if (it != ScopeCounters.end()) {
+ return (*it->second)[type];
+ }
+
+ auto scopeRequests = std::make_shared<TScopeCounters>(CreateArray<RTS_MAX, TTestConnectionRequestCountersPtr>({
+ { MakeIntrusive<TTestConnectionRequestCounters>("TestDataStreamsConnection") },
+ { MakeIntrusive<TTestConnectionRequestCounters>("TestMonitoringConnection") },
+ { MakeIntrusive<TTestConnectionRequestCounters>("TestObjectStorageConnection") },
+ { MakeIntrusive<TTestConnectionRequestCounters>("TestUnsupportedConnection") },
+ }));
+
+ auto scopeCounters = (cloudId ? Counters->GetSubgroup("cloud_id", cloudId) : Counters)
+ ->GetSubgroup("scope", scope);
+
+ for (auto& request: *scopeRequests) {
+ request->Register(scopeCounters);
+ }
+
+ ScopeCounters[key] = scopeRequests;
+ return (*scopeRequests)[type];
+ }
+ };
+
NConfig::TTestConnectionConfig Config;
+ ::NYq::TControlPlaneStorageConfig ControlPlaneStorageConfig;
+ NConfig::TCommonConfig CommonConfig;
+ NYq::TYqSharedResources::TPtr SharedResouces;
+ NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
+ NPq::NConfigurationManager::IConnections::TPtr CmConnections;
+ const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
+ TCounters Counters;
+ NYq::TSigner::TPtr Signer;
+ TActorId DatabaseResolverActor;
+ std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver;
+ NYql::IHTTPGateway::TPtr HttpGateway;
public:
- TTestConnectionActor(const NConfig::TTestConnectionConfig& config, const NMonitoring::TDynamicCounterPtr& counters)
- : Counters(counters)
- , Config(config)
+ TTestConnectionActor(
+ const NConfig::TTestConnectionConfig& config,
+ const NConfig::TControlPlaneStorageConfig& controlPlaneStorageConfig,
+ const NConfig::TCommonConfig& commonConfig,
+ const NConfig::TTokenAccessorConfig& tokenAccessorConfig,
+ const NYq::TYqSharedResources::TPtr& sharedResources,
+ const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory,
+ const NPq::NConfigurationManager::IConnections::TPtr& cmConnections,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ const NYql::IHTTPGateway::TPtr& httpGateway,
+ const NMonitoring::TDynamicCounterPtr& counters)
+ : Config(config)
+ , ControlPlaneStorageConfig(controlPlaneStorageConfig, commonConfig)
+ , CommonConfig(commonConfig)
+ , SharedResouces(sharedResources)
+ , CredentialsFactory(credentialsFactory)
+ , CmConnections(cmConnections)
+ , FunctionRegistry(functionRegistry)
+ , Counters(counters)
+ , HttpGateway(httpGateway)
{
+ if (tokenAccessorConfig.GetHmacSecretFile()) {
+ Signer = ::NYq::CreateSignerFromFile(tokenAccessorConfig.GetHmacSecretFile());
+ }
}
static constexpr char ActorName[] = "YQ_TEST_CONNECTION";
@@ -31,6 +141,12 @@ public:
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(YQ_TEST_CONNECTION_PROVIDER));
+ DatabaseResolverActor = Register(NYq::CreateDatabaseResolver(NYq::MakeYqlAnalyticsHttpProxyId(), CredentialsFactory));
+ DbResolver = std::make_shared<NYq::TDatabaseAsyncResolverImpl>(
+ NActors::TActivationContext::ActorSystem(), DatabaseResolverActor,
+ CommonConfig.GetYdbMvpCloudEndpoint(), CommonConfig.GetMdbGateway(),
+ CommonConfig.GetMdbTransformHost());
+
Become(&TTestConnectionActor::StateFunc);
}
@@ -40,9 +156,56 @@ public:
)
void Handle(TEvTestConnection::TEvTestConnectionRequest::TPtr& ev) {
+ NYql::TIssues issues = ValidateTestConnection(ev, ControlPlaneStorageConfig.Proto.GetMaxRequestSize(), ControlPlaneStorageConfig.AvailableConnections, ControlPlaneStorageConfig.Proto.GetDisableCurrentIam());
+ const TString& cloudId = ev->Get()->CloudId;
+ const TString& scope = ev->Get()->Scope;
+ const TString& user = ev->Get()->User;
+ const TString& token = ev->Get()->Token;
YandexQuery::TestConnectionRequest request = std::move(ev->Get()->Request);
- TC_LOG_T("TestConnectionRequest: " << request.DebugString());
- Send(ev->Sender, new TEvTestConnection::TEvTestConnectionResponse(NYql::TIssues{MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Unimplemented yet")}), 0, ev->Cookie);
+ TTestConnectionRequestCountersPtr requestCounters = Counters.GetScopeCounters(cloudId, scope, request.setting().connection_case());
+ if (issues) {
+ requestCounters->Error->Inc();
+ TC_LOG_D("TestConnectionRequest: validation failed " << scope << " " << user << " " << NKikimr::MaskTicket(token) << issues.ToOneLineString());
+ Send(ev->Sender, new TEvTestConnection::TEvTestConnectionResponse(issues), 0, ev->Cookie);
+ return;
+ }
+
+ TC_LOG_T("TestConnectionRequest: " << scope << " " << user << " " << NKikimr::MaskTicket(token) << request.DebugString());
+ switch (request.setting().connection_case()) {
+ case YandexQuery::ConnectionSetting::kDataStreams: {
+ Register(CreateTestDataStreamsConnectionActor(
+ *request.mutable_setting()->mutable_data_streams(),
+ CommonConfig, DbResolver, ev->Sender,
+ ev->Cookie, SharedResouces,
+ CredentialsFactory, CmConnections, FunctionRegistry,
+ scope, user, token,
+ Signer, requestCounters));
+ break;
+ }
+ case YandexQuery::ConnectionSetting::kObjectStorage: {
+ Register(CreateTestObjectStorageConnectionActor(
+ *request.mutable_setting()->mutable_object_storage(),
+ CommonConfig, ev->Sender,
+ ev->Cookie, CredentialsFactory,
+ HttpGateway, scope, user, token,
+ Signer, requestCounters));
+ break;
+ }
+ case YandexQuery::ConnectionSetting::kMonitoring: {
+ Register(CreateTestMonitoringConnectionActor(
+ *request.mutable_setting()->mutable_monitoring(),
+ ev->Sender, ev->Cookie, CredentialsFactory,
+ scope, user, token,
+ Signer, requestCounters));
+ break;
+ }
+ default: {
+ LWPROBE(TestUnsupportedConnectionRequest, scope, user);
+ requestCounters->Error->Inc();
+ TC_LOG_E("TestConnectionRequest: unimplemented " << scope << " " << user << " " << NKikimr::MaskTicket(token) << request.DebugString());
+ Send(ev->Sender, new TEvTestConnection::TEvTestConnectionResponse(NYql::TIssues{MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Unimplemented yet")}), 0, ev->Cookie);
+ }
+ }
}
void Handle(NMon::TEvHttpInfo::TPtr& ev) {
@@ -63,8 +226,21 @@ NActors::TActorId TestConnectionActorId() {
return NActors::TActorId(0, name);
}
-NActors::IActor* CreateTestConnectionActor(const NConfig::TTestConnectionConfig& config, const NMonitoring::TDynamicCounterPtr& counters) {
- return new TTestConnectionActor(config, counters);
+NActors::IActor* CreateTestConnectionActor(
+ const NConfig::TTestConnectionConfig& config,
+ const NConfig::TControlPlaneStorageConfig& controlPlaneStorageConfig,
+ const NConfig::TCommonConfig& commonConfig,
+ const NConfig::TTokenAccessorConfig& tokenAccessorConfig,
+ const NYq::TYqSharedResources::TPtr& sharedResources,
+ const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory,
+ const NPq::NConfigurationManager::IConnections::TPtr& cmConnections,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ const NYql::IHTTPGateway::TPtr& httpGateway,
+ const NMonitoring::TDynamicCounterPtr& counters) {
+ return new TTestConnectionActor(config, controlPlaneStorageConfig, commonConfig,
+ tokenAccessorConfig, sharedResources,
+ credentialsFactory, cmConnections,
+ functionRegistry, httpGateway, counters);
}
} // namespace NYq
diff --git a/ydb/core/yq/libs/test_connection/test_connection.h b/ydb/core/yq/libs/test_connection/test_connection.h
index 7be25e8e57..258b6f86ac 100644
--- a/ydb/core/yq/libs/test_connection/test_connection.h
+++ b/ydb/core/yq/libs/test_connection/test_connection.h
@@ -1,7 +1,14 @@
#pragma once
+#include "counters.h"
+
#include <ydb/core/yq/libs/actors/logging/log.h>
#include <ydb/core/yq/libs/config/protos/test_connection.pb.h>
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
+#include <ydb/core/yq/libs/signer/signer.h>
+#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
+#include <ydb/library/yql/providers/pq/cm_client/interface/client.h>
+#include <ydb/public/api/protos/yq.pb.h>
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
@@ -18,10 +25,71 @@
#define TC_LOG_T(s) \
LOG_YQ_TEST_CONNECTION_TRACE(s)
+#define TC_LOG_AS_D(a, s) \
+ LOG_YQ_TEST_CONNECTION_AS_DEBUG(a, s)
+#define TC_LOG_AS_I(a, s) \
+ LOG_YQ_TEST_CONNECTION_AS_INFO(a, s)
+#define TC_LOG_AS_W(a, s) \
+ LOG_YQ_TEST_CONNECTION_AS_WARN(a, s)
+#define TC_LOG_AS_E(a, s) \
+ LOG_YQ_TEST_CONNECTION_AS_ERROR(a, s)
+#define TC_LOG_AS_T(a, s) \
+ LOG_YQ_TEST_CONNECTION_AS_TRACE(a, s)
+
namespace NYq {
NActors::TActorId TestConnectionActorId();
-NActors::IActor* CreateTestConnectionActor(const NConfig::TTestConnectionConfig& config, const NMonitoring::TDynamicCounterPtr& counters);
+NActors::IActor* CreateTestConnectionActor(
+ const NConfig::TTestConnectionConfig& config,
+ const NConfig::TControlPlaneStorageConfig& controlPlaneStorageConfig,
+ const NConfig::TCommonConfig& commonConfig,
+ const NConfig::TTokenAccessorConfig& tokenAccessorConfig,
+ const NYq::TYqSharedResources::TPtr& sharedResources,
+ const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory,
+ const NPq::NConfigurationManager::IConnections::TPtr& cmConnections,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ const NYql::IHTTPGateway::TPtr& httpGateway,
+ const NMonitoring::TDynamicCounterPtr& counters);
+
+NActors::IActor* CreateTestDataStreamsConnectionActor(
+ const YandexQuery::DataStreams& ds,
+ const NYq::NConfig::TCommonConfig& commonConfig,
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver>& dbResolver,
+ const NActors::TActorId& sender,
+ ui64 cookie,
+ const NYq::TYqSharedResources::TPtr& sharedResources,
+ const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory,
+ const ::NPq::NConfigurationManager::IConnections::TPtr& cmConnections,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ const TString& scope,
+ const TString& user,
+ const TString& token,
+ const NYq::TSigner::TPtr& signer,
+ const TTestConnectionRequestCountersPtr& counters);
+
+NActors::IActor* CreateTestObjectStorageConnectionActor(
+ const YandexQuery::ObjectStorageConnection& os,
+ const NYq::NConfig::TCommonConfig& commonConfig,
+ const NActors::TActorId& sender,
+ ui64 cookie,
+ const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory,
+ NYql::IHTTPGateway::TPtr gateway,
+ const TString& scope,
+ const TString& user,
+ const TString& token,
+ const NYq::TSigner::TPtr& signer,
+ const TTestConnectionRequestCountersPtr& counters);
+
+NActors::IActor* CreateTestMonitoringConnectionActor(
+ const YandexQuery::Monitoring& monitoring,
+ const NActors::TActorId& sender,
+ ui64 cookie,
+ const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory,
+ const TString& scope,
+ const TString& user,
+ const TString& token,
+ const NYq::TSigner::TPtr& signer,
+ const TTestConnectionRequestCountersPtr& counters);
} // namespace NYq
diff --git a/ydb/core/yq/libs/test_connection/test_data_streams.cpp b/ydb/core/yq/libs/test_connection/test_data_streams.cpp
new file mode 100644
index 0000000000..1bcab3697d
--- /dev/null
+++ b/ydb/core/yq/libs/test_connection/test_data_streams.cpp
@@ -0,0 +1,293 @@
+#include "events/events.h"
+#include "probes.h"
+#include "test_connection.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+#include <ydb/core/yq/libs/actors/clusters_from_connections.h>
+#include <ydb/core/yq/libs/config/yq_issue.h>
+#include <ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
+#include <ydb/library/security/util.h>
+#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
+#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
+#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
+
+namespace {
+
+struct TEvPrivate {
+ enum EEv {
+ EvResolveDbResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+ EvOpenSessionResponse,
+ EvCheckListStreamsResponse,
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
+
+ struct TEvResolveDbResponse : NActors::TEventLocal<TEvResolveDbResponse, EvResolveDbResponse> {
+ NYql::TDbResolverResponse Result;
+
+ TEvResolveDbResponse(const NYql::TDbResolverResponse& result)
+ : Result(result)
+ {}
+ };
+
+ struct TEvOpenSessionResponse : NActors::TEventLocal<TEvOpenSessionResponse, EvOpenSessionResponse> {
+ bool IsSuccess = false;
+ TString ErrorMessage;
+
+ TEvOpenSessionResponse(const TString& errorMessage)
+ : IsSuccess(false)
+ , ErrorMessage(errorMessage)
+ {}
+
+ TEvOpenSessionResponse()
+ : IsSuccess(true)
+ {}
+ };
+
+ struct TEvCheckListStreamsResponse : NActors::TEventLocal<TEvCheckListStreamsResponse, EvCheckListStreamsResponse> {
+ bool IsSuccess = false;
+ TString ErrorMessage;
+
+ TEvCheckListStreamsResponse(const TString& errorMessage)
+ : IsSuccess(false)
+ , ErrorMessage(errorMessage)
+ {}
+
+ TEvCheckListStreamsResponse()
+ : IsSuccess(true)
+ {}
+ };
+};
+
+}
+
+namespace NYq {
+
+LWTRACE_USING(YQ_TEST_CONNECTION_PROVIDER);
+
+using namespace NActors;
+
+class TTestDataStreamsConnectionActor : public NActors::TActorBootstrapped<TTestDataStreamsConnectionActor> {
+ inline static const TString SessionName = "test_connection_data_streams";
+
+ NYq::NConfig::TCommonConfig CommonConfig;
+ TActorId Sender;
+ ui64 Cookie;
+ TString Scope;
+ TString User;
+ TString Token;
+ TTestConnectionRequestCountersPtr Counters;
+ NYq::TYqSharedResources::TPtr SharedResources;
+ NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
+ ::NPq::NConfigurationManager::IConnections::TPtr CmConnections;
+ const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
+ std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver;
+ NYql::TPqClusterConfig ClusterConfig{};
+ TString StructuredToken{};
+ NYql::IPqGateway::TPtr Gateway{};
+ const TInstant StartTime = TInstant::Now();
+
+public:
+ TTestDataStreamsConnectionActor(
+ const YandexQuery::DataStreams& ds,
+ const NYq::NConfig::TCommonConfig& commonConfig,
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver>& dbResolver,
+ const TActorId& sender,
+ ui64 cookie,
+ const NYq::TYqSharedResources::TPtr& sharedResources,
+ const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory,
+ const ::NPq::NConfigurationManager::IConnections::TPtr& cmConnections,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ const TString& scope,
+ const TString& user,
+ const TString& token,
+ const NYq::TSigner::TPtr& signer,
+ const TTestConnectionRequestCountersPtr& counters)
+ : CommonConfig(commonConfig)
+ , Sender(sender)
+ , Cookie(cookie)
+ , Scope(scope)
+ , User(user)
+ , Token(token)
+ , Counters(counters)
+ , SharedResources(sharedResources)
+ , CredentialsFactory(credentialsFactory)
+ , CmConnections(cmConnections)
+ , FunctionRegistry(functionRegistry)
+ , DbResolver(dbResolver)
+ , ClusterConfig(CreateClusterConfig(SessionName, CommonConfig, Token, signer, ds))
+ , StructuredToken(NYql::ComposeStructuredTokenJsonForServiceAccount(ClusterConfig.GetServiceAccountId(), ClusterConfig.GetServiceAccountIdSignature(), ClusterConfig.GetToken()))
+ {
+ Counters->InFly->Inc();
+ }
+
+ static constexpr char ActorName[] = "YQ_TEST_DATA_STREAMS_CONNECTION";
+
+ void Bootstrap() {
+ TC_LOG_D(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Starting test data stream connection actor. Actor id: " << SelfId());
+ TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Structured token: " << StructuredToken << " service-account: " << ClusterConfig.GetServiceAccountId() << " signature: " << ClusterConfig.GetServiceAccountIdSignature() << " token: " << NKikimr::MaskTicket(ClusterConfig.GetToken()));
+ Become(&TTestDataStreamsConnectionActor::StateFunc);
+ SendResolveDatabaseId();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvResolveDbResponse, Handler);
+ hFunc(TEvPrivate::TEvOpenSessionResponse, Handler);
+ hFunc(TEvPrivate::TEvCheckListStreamsResponse, Handler);
+ )
+
+private:
+ void SendResolveDatabaseId() {
+ if (ClusterConfig.GetDatabase()) {
+ TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Database from connection settings " << ClusterConfig.GetDatabase());
+ SendOpenSession();
+ return;
+ }
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids;
+ ids[std::pair{ClusterConfig.GetDatabaseId(), NYql::DatabaseType::DataStreams}] = {StructuredToken, CommonConfig.GetUseBearerForYdb()};
+ DbResolver->ResolveIds(ids).Apply([self=SelfId(), as=TActivationContext::ActorSystem()](const auto& future) {
+ try {
+ auto result = future.GetValue();
+ as->Send(new IEventHandle(self, self, new TEvPrivate::TEvResolveDbResponse(result), 0));
+ } catch (...) {
+ as->Send(new IEventHandle(self, self, new TEvPrivate::TEvResolveDbResponse(NYql::TDbResolverResponse{{}, false, NYql::TIssues{MakeErrorIssue(NYq::TIssuesIds::BAD_REQUEST, CurrentExceptionMessage())}}), 0));
+ }
+ });
+ }
+
+ void Handler(TEvPrivate::TEvResolveDbResponse::TPtr& ev) {
+ const auto& response = ev->Get()->Result;
+ if (!response.Success) {
+ TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Resolve datababse id " << ClusterConfig.GetDatabaseId() << " error " << response.Issues.ToOneLineString());
+ ReplyError(response.Issues);
+ return;
+ }
+
+ auto it = response.DatabaseId2Endpoint.find(std::pair{ClusterConfig.GetDatabaseId(), NYql::DatabaseType::DataStreams});
+ if (it == response.DatabaseId2Endpoint.end()) {
+ TC_LOG_E(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Test data streams connection: database is not found for database_id " << ClusterConfig.GetDatabaseId());
+ ReplyError(TStringBuilder{} << "Test data streams connection: database is not found for database_id " << ClusterConfig.GetDatabaseId());
+ return;
+ }
+
+ TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Resolve datababse id result: " << it->second.Database);
+ ClusterConfig.SetDatabase(it->second.Database);
+ ClusterConfig.SetEndpoint(it->second.Endpoint);
+ ClusterConfig.SetUseSsl(it->second.Secure);
+ SendOpenSession();
+ }
+
+ void SendOpenSession() {
+ Gateway = NYql::CreatePqNativeGateway(CreateGatewayServices());
+ Gateway->OpenSession(SessionName, {}).Apply([self=SelfId(), as=TActivationContext::ActorSystem()](const auto& future) {
+ try {
+ future.TryRethrow();
+ as->Send(new IEventHandle(self, self, new TEvPrivate::TEvOpenSessionResponse(), 0));
+ } catch (...) {
+ as->Send(new IEventHandle(self, self, new TEvPrivate::TEvOpenSessionResponse(CurrentExceptionMessage()), 0));
+ }
+ });
+ }
+
+ void Handler(TEvPrivate::TEvOpenSessionResponse::TPtr& ev) {
+ const auto& response = *ev->Get();
+ if (!response.IsSuccess) {
+ TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Open session error " << response.ErrorMessage);
+ ReplyError(response.ErrorMessage);
+ return;
+ }
+ TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Open session: ok");
+ SendCheckListStreams();
+ }
+
+ void SendCheckListStreams() {
+ Gateway->ListStreams(SessionName, SessionName, ClusterConfig.GetDatabase(), StructuredToken, 1).Apply([self=SelfId(), as=TActivationContext::ActorSystem()](const auto& future) {
+ try {
+ future.TryRethrow();
+ as->Send(new IEventHandle(self, self, new TEvPrivate::TEvCheckListStreamsResponse(), 0));
+ } catch (...) {
+ as->Send(new IEventHandle(self, self, new TEvPrivate::TEvCheckListStreamsResponse(CurrentExceptionMessage()), 0));
+ }
+ });
+ }
+
+ void Handler(TEvPrivate::TEvCheckListStreamsResponse::TPtr& ev) {
+ const auto& response = *ev->Get();
+ if (!response.IsSuccess) {
+ TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Check list strems error " << response.ErrorMessage);
+ ReplyError(response.ErrorMessage);
+ return;
+ }
+ TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Check list streams: ok");
+ ReplyOk();
+ }
+
+ void DestroyActor(bool success = true) {
+ Counters->InFly->Dec();
+ TDuration delta = TInstant::Now() - StartTime;
+ Counters->LatencyMs->Collect(delta.MilliSeconds());
+ LWPROBE(TestDataStreamsConnectionRequest, Scope, User, delta, success);
+ PassAway();
+ }
+
+ void ReplyError(const NYql::TIssues& issues) {
+ Counters->Error->Inc();
+ Send(Sender, new NYq::TEvTestConnection::TEvTestConnectionResponse(issues), 0, Cookie);
+ DestroyActor(false /* success */);
+ }
+
+ void ReplyError(const TString& message) {
+ ReplyError(NYql::TIssues{MakeErrorIssue(NYq::TIssuesIds::BAD_REQUEST, message)});
+ }
+
+ void ReplyOk() {
+ Counters->Ok->Inc();
+ Send(Sender, new NYq::TEvTestConnection::TEvTestConnectionResponse(YandexQuery::TestConnectionResult{}), 0, Cookie);
+ DestroyActor();
+ }
+
+ static NYql::TPqClusterConfig CreateClusterConfig(const TString& sessionName, const NYq::NConfig::TCommonConfig& commonConfig, const TString& token, const NYq::TSigner::TPtr& signer, const YandexQuery::DataStreams& ds) {
+ const auto& auth = ds.auth();
+ const TString signedAccountId = signer && auth.has_service_account() ? signer->SignAccountId(auth.service_account().id()) : TString{};
+ return NYq::CreatePqClusterConfig(sessionName, commonConfig.GetUseBearerForYdb(), token, signedAccountId, ds);
+ }
+
+ NYql::TPqGatewayServices CreateGatewayServices() {
+ NYql::TPqGatewayConfig config;
+ *config.AddClusterMapping() = ClusterConfig;
+ NYql::TPqGatewayServices pqServices(
+ SharedResources->UserSpaceYdbDriver,
+ CmConnections,
+ CredentialsFactory,
+ std::make_shared<NYql::TPqGatewayConfig>(config),
+ FunctionRegistry
+ );
+ return pqServices;
+ }
+};
+
+NActors::IActor* CreateTestDataStreamsConnectionActor(
+ const YandexQuery::DataStreams& ds,
+ const NYq::NConfig::TCommonConfig& commonConfig,
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver>& dbResolver,
+ const TActorId& sender,
+ ui64 cookie,
+ const NYq::TYqSharedResources::TPtr& sharedResources,
+ const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory,
+ const ::NPq::NConfigurationManager::IConnections::TPtr& cmConnections,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
+ const TString& scope,
+ const TString& user,
+ const TString& token,
+ const NYq::TSigner::TPtr& signer,
+ const TTestConnectionRequestCountersPtr& counters) {
+ return new TTestDataStreamsConnectionActor(
+ ds, commonConfig, dbResolver, sender,
+ cookie, sharedResources, credentialsFactory,
+ cmConnections, functionRegistry,
+ scope, user, token, signer, counters);
+}
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/test_connection/test_monitoring.cpp b/ydb/core/yq/libs/test_connection/test_monitoring.cpp
new file mode 100644
index 0000000000..fd2e7a1e54
--- /dev/null
+++ b/ydb/core/yq/libs/test_connection/test_monitoring.cpp
@@ -0,0 +1,160 @@
+#include "probes.h"
+#include "test_connection.h"
+
+#include <ydb/core/yq/libs/actors/clusters_from_connections.h>
+#include <ydb/core/yq/libs/config/yq_issue.h>
+#include <ydb/core/yq/libs/test_connection/events/events.h>
+#include <ydb/library/security/util.h>
+
+#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
+#include <ydb/library/yql/utils/url_builder.h>
+
+#include <ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h>
+#include <ydb/library/yql/utils/actors/http_sender_actor.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NYq {
+
+LWTRACE_USING(YQ_TEST_CONNECTION_PROVIDER);
+
+using namespace NActors;
+
+class TTestMonitoringConnectionActor : public NActors::TActorBootstrapped<TTestMonitoringConnectionActor> {
+ TActorId Sender;
+ TActorId HttpProxyId;
+ ui64 Cookie;
+ TString Scope;
+ TString User;
+ TString Token;
+ TTestConnectionRequestCountersPtr Counters;
+ NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
+ NYq::TSigner::TPtr Signer;
+ NYql::TSolomonClusterConfig ClusterConfig;
+ const TInstant StartTime = TInstant::Now();
+
+public:
+ TTestMonitoringConnectionActor(
+ const YandexQuery::Monitoring& monitoring,
+ const TActorId& sender,
+ ui64 cookie,
+ const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory,
+ const TString& scope,
+ const TString& user,
+ const TString& token,
+ const NYq::TSigner::TPtr& signer,
+ const TTestConnectionRequestCountersPtr& counters)
+ : Sender(sender)
+ , Cookie(cookie)
+ , Scope(scope)
+ , User(user)
+ , Token(token)
+ , Counters(counters)
+ , CredentialsFactory(credentialsFactory)
+ , Signer(signer)
+ , ClusterConfig(NYq::CreateSolomonClusterConfig({}, token, signer ? signer->SignAccountId(monitoring.auth().service_account().id()) : "", monitoring))
+ {
+ Counters->InFly->Inc();
+ }
+
+ static constexpr char ActorName[] = "YQ_TEST_MONITORING_CONNECTION";
+
+ void Bootstrap() {
+ TC_LOG_D(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Starting test monitoring connection actor. Actor id: " << SelfId());
+ Become(&TTestMonitoringConnectionActor::StateFunc);
+
+ HttpProxyId = Register(NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance()));
+ const NHttp::THttpOutgoingRequestPtr httpRequest = BuildSolomonRequest();
+ const TActorId httpSenderId = Register(NYql::NDq::CreateHttpSenderActor(SelfId(), HttpProxyId));
+ Send(httpSenderId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest), /*flags=*/0, Cookie);
+ TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " send request " << httpRequest->Method << " " << httpRequest->Protocol << " " << httpRequest->Host << " " << httpRequest->URL << " " << httpRequest->Body);
+ }
+
+ void FillAuth(NHttp::THttpOutgoingRequestPtr& httpRequest) {
+ const TString authorizationHeader = "Authorization";
+ const auto structedToken = NYql::ComposeStructuredTokenJsonForServiceAccount(ClusterConfig.GetServiceAccountId(), ClusterConfig.GetServiceAccountIdSignature(), ClusterConfig.GetToken());
+ const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(CredentialsFactory, structedToken);
+ const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
+
+ switch (static_cast<NYql::NSo::NProto::ESolomonClusterType>(ClusterConfig.GetClusterType())) {
+ case NYql::NSo::NProto::ESolomonClusterType::CT_SOLOMON:
+ httpRequest->Set(authorizationHeader, "OAuth " + authToken);
+ break;
+ case NYql::NSo::NProto::ESolomonClusterType::CT_MONITORING:
+ httpRequest->Set(authorizationHeader, "Bearer " + authToken);
+ break;
+ default:
+ Y_ENSURE(false, "Invalid cluster type " << ToString<ui32>(ClusterConfig.GetClusterType()));
+ }
+ }
+
+ NHttp::THttpOutgoingRequestPtr BuildSolomonRequest() {
+ const TString url = NYql::NDq::GetSolomonUrl(ClusterConfig.GetCluster(),
+ ClusterConfig.GetUseSsl(),
+ ClusterConfig.GetPath().GetProject(),
+ ClusterConfig.GetPath().GetCluster(),
+ {},
+ static_cast<NYql::NSo::NProto::ESolomonClusterType>(ClusterConfig.GetClusterType()));
+ NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestPost(url);
+ FillAuth(httpRequest);
+ httpRequest->Set<&NHttp::THttpRequest::ContentType>("application/json");
+ httpRequest->Set<&NHttp::THttpRequest::Body>("{}");
+ return httpRequest;
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(NYql::NDq::TEvHttpBase::TEvSendResult, Handle);
+ )
+
+ void Handle(NYql::NDq::TEvHttpBase::TEvSendResult::TPtr& ev) {
+ const auto* res = ev->Get();
+ if (res->HttpIncomingResponse->Get()->Response->Status == "400") {
+ TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " ok " << res->HttpIncomingResponse->Get()->ToString());
+ ReplyOk();
+ return;
+ }
+
+ const TString& error = res->HttpIncomingResponse->Get()->GetError();
+ TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " access problem " << res->HttpIncomingResponse->Get()->ToString() << " " << error);
+ ReplyError(error);
+ }
+
+ void DestroyActor(bool isSuccess) {
+ Counters->InFly->Dec();
+ TDuration delta = TInstant::Now() - StartTime;
+ Counters->LatencyMs->Collect(delta.MilliSeconds());
+ LWPROBE(TestMonitoringConnectionRequest, Scope, User, delta, isSuccess);
+ Send(HttpProxyId, new NActors::TEvents::TEvPoison());
+ PassAway();
+ }
+
+ void ReplyError(const TString& message) {
+ Counters->Error->Inc();
+ Send(Sender, new NYq::TEvTestConnection::TEvTestConnectionResponse(NYql::TIssues{MakeErrorIssue(NYq::TIssuesIds::BAD_REQUEST, message)}), Cookie);
+ DestroyActor(false /* success */);
+ }
+
+ void ReplyOk() {
+ Counters->Ok->Inc();
+ Send(Sender, new NYq::TEvTestConnection::TEvTestConnectionResponse(YandexQuery::TestConnectionResult{}), Cookie);
+ DestroyActor(true /* success */);
+ }
+};
+
+NActors::IActor* CreateTestMonitoringConnectionActor(
+ const YandexQuery::Monitoring& monitoring,
+ const TActorId& sender,
+ ui64 cookie,
+ const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory,
+ const TString& scope,
+ const TString& user,
+ const TString& token,
+ const NYq::TSigner::TPtr& signer,
+ const TTestConnectionRequestCountersPtr& counters) {
+ return new TTestMonitoringConnectionActor(
+ monitoring, sender,
+ cookie, credentialsFactory,
+ scope, user, token, signer, counters);
+}
+
+} // namespace NYq
diff --git a/ydb/core/yq/libs/test_connection/test_object_storage.cpp b/ydb/core/yq/libs/test_connection/test_object_storage.cpp
new file mode 100644
index 0000000000..80832654cf
--- /dev/null
+++ b/ydb/core/yq/libs/test_connection/test_object_storage.cpp
@@ -0,0 +1,223 @@
+#include "events/events.h"
+#include "probes.h"
+#include "test_connection.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+#include <ydb/core/yq/libs/actors/clusters_from_connections.h>
+#include <ydb/core/yq/libs/config/yq_issue.h>
+#include <ydb/library/security/util.h>
+
+#include <ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.h>
+#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
+#include <ydb/library/yql/utils/url_builder.h>
+
+#ifdef THROW
+#undef THROW
+#endif
+#include <library/cpp/xml/document/xml-document.h>
+
+namespace {
+
+struct TEvPrivate {
+ enum EEv {
+ EvDiscoveryResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
+
+ struct TEvDiscoveryResponse : NActors::TEventLocal<TEvDiscoveryResponse, EvDiscoveryResponse> {
+ bool IsSuccess = false;
+ TString ErrorMessage;
+
+ TEvDiscoveryResponse(const TString& errorMessage)
+ : IsSuccess(false)
+ , ErrorMessage(errorMessage)
+ {}
+
+ TEvDiscoveryResponse()
+ : IsSuccess(true)
+ {}
+ };
+};
+
+}
+
+namespace NYq {
+
+LWTRACE_USING(YQ_TEST_CONNECTION_PROVIDER);
+
+using namespace NActors;
+
+class TTestObjectStorageConnectionActor : public NActors::TActorBootstrapped<TTestObjectStorageConnectionActor> {
+ TTestConnectionRequestCountersPtr Counters;
+ TActorId Sender;
+ TString Scope;
+ TString User;
+ TString Token;
+ ui64 Cookie;
+ NYql::IHTTPGateway::TPtr Gateway;
+ NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
+ NYql::TS3ClusterConfig ClusterConfig;
+
+ const TInstant StartTime = TInstant::Now();
+
+public:
+ TTestObjectStorageConnectionActor(
+ const YandexQuery::ObjectStorageConnection& os,
+ const NYq::NConfig::TCommonConfig& commonConfig,
+ const TActorId& sender,
+ ui64 cookie,
+ const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory,
+ NYql::IHTTPGateway::TPtr gateway,
+ const TString& scope,
+ const TString& user,
+ const TString& token,
+ const NYq::TSigner::TPtr& signer,
+ const TTestConnectionRequestCountersPtr& counters)
+ : Counters(counters)
+ , Sender(sender)
+ , Scope(scope)
+ , User(user)
+ , Token(token)
+ , Cookie(cookie)
+ , Gateway(gateway)
+ , CredentialsFactory(credentialsFactory)
+ , ClusterConfig(NYq::CreateS3ClusterConfig({}, token, commonConfig.GetObjectStorageEndpoint(), signer ? signer->SignAccountId(os.auth().service_account().id()) : "", os))
+ {
+ Counters->InFly->Inc();
+ }
+
+ static constexpr char ActorName[] = "YQ_TEST_OBJECT_STORAGE_CONNECTION";
+
+ void Bootstrap() {
+ Become(&TTestObjectStorageConnectionActor::StateFunc);
+ TC_LOG_D(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Starting test object storage connection actor. Actor id: " << SelfId());
+ SendDiscover();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvDiscoveryResponse, Handle);
+ )
+
+ void Handle(TEvPrivate::TEvDiscoveryResponse::TPtr& ev) {
+ const auto& response = *ev->Get();
+ if (response.IsSuccess) {
+ ReplyOk();
+ } else {
+ ReplyError(response.ErrorMessage);
+ }
+ }
+
+private:
+ static void SendError(TActorId self, NActors::TActorSystem* as, const TString& errorMessage) {
+ as->Send(new IEventHandle(self, self, new TEvPrivate::TEvDiscoveryResponse(errorMessage), 0));
+ }
+
+ static void SendOk(TActorId self, NActors::TActorSystem* as) {
+ as->Send(new IEventHandle(self, self, new TEvPrivate::TEvDiscoveryResponse(), 0));
+ }
+
+ static void DiscoveryCallback(NYql::IHTTPGateway::TResult&& result, TActorId self, NActors::TActorSystem* as) {
+ switch (result.index()) {
+ case 0U: try {
+ const NXml::TDocument xml(std::get<NYql::IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String);
+ if (const auto& root = xml.Root(); root.Name() == "Error") {
+ const auto& code = root.Node("Code", true).Value<TString>();
+ const auto& message = root.Node("Message", true).Value<TString>();
+ SendError(self, as, TStringBuilder() << message << ", code: " << code);
+ return;
+ } else if (root.Name() != "ListBucketResult") {
+ SendError(self, as, TStringBuilder() << "Unexpected response '" << root.Name() << "' on discovery.");
+ return;
+ } else {
+ break;
+ }
+ } catch (const std::exception& ex) {
+ SendError(self, as, TStringBuilder() << "Exception occurred: " << ex.what());
+ return;
+ }
+ case 1U:
+ SendError(self, as, TStringBuilder() << "Issues occurred: " << std::get<NYql::TIssues>(result).ToString());
+ return;
+ default:
+ SendError(self, as, TStringBuilder() << "Undefined variant index: " << result.index());
+ return;
+ }
+ SendOk(self, as);
+ }
+
+ static ERetryErrorClass RetryS3SlowDown(long httpResponseCode) {
+ return httpResponseCode == 503 ? ERetryErrorClass::LongRetry : ERetryErrorClass::NoRetry; // S3 Slow Down == 503
+ }
+
+ void SendDiscover() {
+ const auto structedToken = NYql::ComposeStructuredTokenJsonForServiceAccount(ClusterConfig.GetServiceAccountId(), ClusterConfig.GetServiceAccountIdSignature(), ClusterConfig.GetToken());
+ const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(CredentialsFactory, structedToken);
+ const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
+
+ NYql::IHTTPGateway::THeaders headers;
+ if (authToken) {
+ headers.push_back(TString("X-YaCloud-SubjectToken:") += authToken);
+ }
+
+ const auto retryPolicy = IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryS3SlowDown);
+
+ NYql::TUrlBuilder urlBuilder(ClusterConfig.GetUrl());
+ const auto url = urlBuilder.AddUrlParam("list-type", "2")
+ .AddUrlParam("max-keys", "1")
+ .Build();
+
+ Gateway->Download(
+ url,
+ headers,
+ 0U,
+ std::bind(&DiscoveryCallback, std::placeholders::_1, SelfId(), TActivationContext::ActorSystem()),
+ /*data=*/"",
+ retryPolicy
+ );
+ }
+
+ void DestroyActor(bool success = true) {
+ Counters->InFly->Dec();
+ TDuration delta = TInstant::Now() - StartTime;
+ Counters->LatencyMs->Collect(delta.MilliSeconds());
+ LWPROBE(TestObjectStorageConnectionRequest, Scope, User, delta, success);
+ PassAway();
+ }
+
+ void ReplyError(const TString& message) {
+ TC_LOG_D(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Invalid access for object storage connection: " << message);
+ Counters->Error->Inc();
+ Send(Sender, new NYq::TEvTestConnection::TEvTestConnectionResponse(NYql::TIssues{MakeErrorIssue(NYq::TIssuesIds::BAD_REQUEST, message)}), 0, Cookie);
+ DestroyActor(false /* success */);
+ }
+
+ void ReplyOk() {
+ TC_LOG_T(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Access is valid for object storage connection");
+ Counters->Ok->Inc();
+ Send(Sender, new NYq::TEvTestConnection::TEvTestConnectionResponse(YandexQuery::TestConnectionResult{}), 0, Cookie);
+ DestroyActor();
+ }
+};
+
+NActors::IActor* CreateTestObjectStorageConnectionActor(
+ const YandexQuery::ObjectStorageConnection& os,
+ const NYq::NConfig::TCommonConfig& commonConfig,
+ const TActorId& sender,
+ ui64 cookie,
+ const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory,
+ NYql::IHTTPGateway::TPtr gateway,
+ const TString& scope,
+ const TString& user,
+ const TString& token,
+ const NYq::TSigner::TPtr& signer,
+ const TTestConnectionRequestCountersPtr& counters) {
+ return new TTestObjectStorageConnectionActor(
+ os, commonConfig, sender,
+ cookie, credentialsFactory, gateway,
+ scope, user, token, signer, counters);
+}
+
+} // namespace NYq
diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp
index b1d64befb3..b04f11ac02 100644
--- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp
+++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp
@@ -42,6 +42,11 @@ NPq::NConfigurationManager::TAsyncDescribePathResult TDummyPqGateway::DescribePa
}
}
+NThreading::TFuture<IPqGateway::TListStreams> TDummyPqGateway::ListStreams(const TString& sessionId, const TString& cluster, const TString& database, const TString& token, ui32 limit, const TString& exclusiveStartStreamName) {
+ Y_UNUSED(sessionId, cluster, database, token, limit, exclusiveStartStreamName);
+ return NThreading::MakeFuture<IPqGateway::TListStreams>();
+}
+
TDummyPqGateway& TDummyPqGateway::AddDummyTopic(const TDummyTopic& topic) {
with_lock (Mutex) {
Y_ENSURE(topic.Cluster);
diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h
index 753c849c20..dbf6f6b52b 100644
--- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h
+++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h
@@ -34,7 +34,20 @@ public:
NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override;
void CloseSession(const TString& sessionId) override;
- NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath(const TString& sessionId, const TString& cluster, const TString& database, const TString& path, const TString& token) override;
+ NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath(
+ const TString& sessionId,
+ const TString& cluster,
+ const TString& database,
+ const TString& path,
+ const TString& token) override;
+
+ NThreading::TFuture<TListStreams> ListStreams(
+ const TString& sessionId,
+ const TString& cluster,
+ const TString& database,
+ const TString& token,
+ ui32 limit,
+ const TString& exclusiveStartStreamName = {}) override;
void UpdateClusterConfigs(
const TString& clusterName,
diff --git a/ydb/library/yql/providers/pq/gateway/native/CMakeLists.txt b/ydb/library/yql/providers/pq/gateway/native/CMakeLists.txt
index 9ec4274875..3c8f646c99 100644
--- a/ydb/library/yql/providers/pq/gateway/native/CMakeLists.txt
+++ b/ydb/library/yql/providers/pq/gateway/native/CMakeLists.txt
@@ -13,6 +13,7 @@ target_link_libraries(pq-gateway-native PUBLIC
yutil
common-token_accessor-client
library-yql-utils
+ cpp-client-ydb_datastreams
cpp-client-ydb_driver
cpp-client-ydb_persqueue_core
providers-common-metrics
diff --git a/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp b/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp
index bd89565050..86673da5a1 100644
--- a/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp
+++ b/ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp
@@ -26,6 +26,14 @@ public:
const TString& path,
const TString& token) override;
+ NThreading::TFuture<TListStreams> ListStreams(
+ const TString& sessionId,
+ const TString& cluster,
+ const TString& database,
+ const TString& token,
+ ui32 limit,
+ const TString& exclusiveStartStreamName = {}) override;
+
void UpdateClusterConfigs(
const TString& clusterName,
const TString& endpoint,
@@ -120,6 +128,10 @@ NPq::NConfigurationManager::TAsyncDescribePathResult TPqNativeGateway::DescribeP
return GetExistingSession(sessionId)->DescribePath(cluster, database, path, token);
}
+NThreading::TFuture<IPqGateway::TListStreams> TPqNativeGateway::ListStreams(const TString& sessionId, const TString& cluster, const TString& database, const TString& token, ui32 limit, const TString& exclusiveStartStreamName) {
+ return GetExistingSession(sessionId)->ListStreams(cluster, database, token, limit, exclusiveStartStreamName);
+}
+
IPqGateway::TPtr CreatePqNativeGateway(const TPqGatewayServices& services) {
return MakeIntrusive<TPqNativeGateway>(services);
}
diff --git a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp
index cd536762e4..18d1ae0e47 100644
--- a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp
+++ b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.cpp
@@ -25,6 +25,17 @@ NYdb::NPersQueue::TPersQueueClientSettings GetYdbPqClientOptions(const TString&
return opts;
}
+
+NYdb::TCommonClientSettings GetDsClientOptions(const TString& database, const NYql::TPqClusterConfig& cfg, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory) {
+ NYdb::TCommonClientSettings opts;
+ opts
+ .DiscoveryEndpoint(cfg.GetEndpoint())
+ .Database(database)
+ .EnableSsl(cfg.GetUseSsl())
+ .CredentialsProviderFactory(credentialsProviderFactory);
+
+ return opts;
+}
}
const NPq::NConfigurationManager::IClient::TPtr& TPqSession::GetConfigManagerClient(const TString& cluster, const NYql::TPqClusterConfig& cfg, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory) {
@@ -43,6 +54,14 @@ NYdb::NPersQueue::TPersQueueClient& TPqSession::GetYdbPqClient(const TString& cl
return ClusterYdbPqClients.emplace(cluster, NYdb::NPersQueue::TPersQueueClient(YdbDriver, GetYdbPqClientOptions(database, cfg, credentialsProviderFactory))).first->second;
}
+NYdb::NDataStreams::V1::TDataStreamsClient& TPqSession::GetDsClient(const TString& cluster, const TString& database, const NYql::TPqClusterConfig& cfg, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory) {
+ const auto clientIt = ClusterDsClients.find(cluster);
+ if (clientIt != ClusterDsClients.end()) {
+ return clientIt->second;
+ }
+ return ClusterDsClients.emplace(cluster, NYdb::NDataStreams::V1::TDataStreamsClient(YdbDriver, GetDsClientOptions(database, cfg, credentialsProviderFactory))).first->second;
+}
+
NPq::NConfigurationManager::TAsyncDescribePathResult TPqSession::DescribePath(const TString& cluster, const TString& database, const TString& path, const TString& token) {
const auto* config = ClusterConfigs->FindPtr(cluster);
if (!config) {
@@ -78,4 +97,49 @@ NPq::NConfigurationManager::TAsyncDescribePathResult TPqSession::DescribePath(co
}
}
+NThreading::TFuture<IPqGateway::TListStreams> TPqSession::ListStreams(const TString& cluster, const TString& database, const TString& token, ui32 limit, const TString& exclusiveStartStreamName) {
+ const auto* config = ClusterConfigs->FindPtr(cluster);
+ if (!config) {
+ ythrow yexception() << "Pq cluster `" << cluster << "` does not exist";
+ }
+
+ YQL_ENSURE(config->GetEndpoint(), "Can't get list topics for " << cluster << ": no endpoint");
+
+ std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(CredentialsFactory, token, config->GetAddBearerToToken());
+ with_lock (Mutex) {
+ if (config->GetClusterType() == TPqClusterConfig::CT_PERS_QUEUE) {
+ const NPq::NConfigurationManager::IClient::TPtr& client = GetConfigManagerClient(cluster, *config, credentialsProviderFactory);
+ if (!client) {
+ NThreading::TPromise<IPqGateway::TListStreams> result = NThreading::NewPromise<IPqGateway::TListStreams>();
+ result.SetException(
+ std::make_exception_ptr(
+ yexception()
+ << "Pq configuration manager is not supported"));
+ return result;
+ }
+ return client->DescribePath("/").Apply([](const auto& future) {
+ auto response = future.GetValue();
+ if (!response.IsPath()) {
+ throw yexception() << "response does not contain object of type path";
+ }
+ return IPqGateway::TListStreams{};
+ });
+ }
+
+ return GetDsClient(cluster, database, *config, credentialsProviderFactory)
+ .ListStreams(NYdb::NDataStreams::V1::TListStreamsSettings{ .Limit_ = limit, .ExclusiveStartStreamName_ = exclusiveStartStreamName})
+ .Apply([](const auto& future) {
+ auto& response = future.GetValue();
+ if (!response.IsSuccess()) {
+ throw yexception() << response.GetIssues().ToString();
+ }
+ const auto& result = response.GetResult();
+ IPqGateway::TListStreams listStrems;
+ listStrems.Names.insert(listStrems.Names.end(), result.stream_names().begin(), result.stream_names().end());
+ return listStrems;
+
+ });
+ }
+}
+
} // namespace NYql
diff --git a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.h b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.h
index 23129837f0..9ab98b22e0 100644
--- a/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.h
+++ b/ydb/library/yql/providers/pq/gateway/native/yql_pq_session.h
@@ -1,10 +1,13 @@
#pragma once
+#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
-#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
+#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/pq/cm_client/interface/client.h>
+#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
+
#include <util/generic/ptr.h>
#include <util/system/mutex.h>
@@ -35,9 +38,12 @@ public:
NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath(const TString& cluster, const TString& database, const TString& path, const TString& token);
+ NThreading::TFuture<IPqGateway::TListStreams> ListStreams(const TString& cluster, const TString& database, const TString& token, ui32 limit, const TString& exclusiveStartStreamName);
+
private:
const NPq::NConfigurationManager::IClient::TPtr& GetConfigManagerClient(const TString& cluster, const NYql::TPqClusterConfig& cfg, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory);
NYdb::NPersQueue::TPersQueueClient& GetYdbPqClient(const TString& cluster, const TString& database, const NYql::TPqClusterConfig& cfg, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory);
+ NYdb::NDataStreams::V1::TDataStreamsClient& GetDsClient(const TString& cluster, const TString& database, const NYql::TPqClusterConfig& cfg, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory);
private:
const TString SessionId;
@@ -50,6 +56,7 @@ private:
TMutex Mutex;
THashMap<TString, NPq::NConfigurationManager::IClient::TPtr> ClusterCmClients; // Cluster -> CM Client.
THashMap<TString, NYdb::NPersQueue::TPersQueueClient> ClusterYdbPqClients; // Cluster -> PQ Client.
+ THashMap<TString, NYdb::NDataStreams::V1::TDataStreamsClient> ClusterDsClients; // Cluster -> DS Client
};
} // namespace NYql
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h b/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h
index 118b37c012..cee8cfffac 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_gateway.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/library/yql/providers/pq/cm_client/interface/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
#include <library/cpp/threading/future/core/future.h>
@@ -12,12 +13,19 @@ namespace NYql {
struct IPqGateway : public TThrRefBase {
using TPtr = TIntrusivePtr<IPqGateway>;
+ struct TListStreams {
+ TVector<TString> Names;
+ };
+
virtual NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) = 0;
virtual void CloseSession(const TString& sessionId) = 0;
// CM API.
virtual ::NPq::NConfigurationManager::TAsyncDescribePathResult DescribePath(const TString& sessionId, const TString& cluster, const TString& database, const TString& path, const TString& token) = 0;
+ // DS API.
+ virtual NThreading::TFuture<TListStreams> ListStreams(const TString& sessionId, const TString& cluster, const TString& database, const TString& token, ui32 limit, const TString& exclusiveStartStreamName = {}) = 0;
+
virtual void UpdateClusterConfigs(
const TString& clusterName,
const TString& endpoint,
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
index 293bba1a42..d6d4141454 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
@@ -5,13 +5,14 @@
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h>
-#include <ydb/library/yql/utils/actor_log/log.h>
-#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/minikql/comp_nodes/mkql_saveload.h>
#include <ydb/library/yql/minikql/mkql_alloc.h>
#include <ydb/library/yql/minikql/mkql_string_util.h>
-#include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/library/yql/utils/actor_log/log.h>
#include <ydb/library/yql/utils/actors/http_sender_actor.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/url_builder.h>
+#include <ydb/library/yql/utils/yql_panic.h>
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/event_local.h>
@@ -75,6 +76,32 @@ struct TMetricsInflight {
} // namespace
+TString GetSolomonUrl(const TString& endpoint, bool useSsl, const TString& project, const TString& cluster, const TString& service, const ::NYql::NSo::NProto::ESolomonClusterType& type) {
+ TUrlBuilder builder((useSsl ? "https://" : "http://") + endpoint);
+
+ switch (type) {
+ case NSo::NProto::ESolomonClusterType::CT_SOLOMON: {
+ builder.AddPathComponent("api");
+ builder.AddPathComponent("v2");
+ builder.AddPathComponent("push");
+ builder.AddUrlParam("project", project);
+ builder.AddUrlParam("cluster", cluster);
+ builder.AddUrlParam("service", service);
+ break;
+ }
+ case NSo::NProto::ESolomonClusterType::CT_MONITORING: {
+ builder.AddPathComponent("monitoring/v2/data/write");
+ builder.AddUrlParam("folderId", cluster);
+ builder.AddUrlParam("service", service);
+ break;
+ }
+ default:
+ Y_ENSURE(false, "Invalid cluster type " << ToString<ui32>(type));
+ }
+
+ return builder.Build();
+}
+
class TDqSolomonWriteActor : public NActors::TActor<TDqSolomonWriteActor>, public IDqComputeActorAsyncOutput {
public:
static constexpr char ActorName[] = "DQ_SOLOMON_WRITE_ACTOR";
@@ -246,29 +273,12 @@ private:
NDqProto::TSinkState BuildState() { return {}; }
TString GetUrl() const {
- TStringBuilder builder;
- builder << (WriteParams.Shard.GetUseSsl() ? "https://" : "http://");
- builder << WriteParams.Shard.GetEndpoint();
-
- switch (WriteParams.Shard.GetClusterType()) {
- case NSo::NProto::ESolomonClusterType::CT_SOLOMON: {
- builder << "/api/v2/push";
- builder << "?project=" << WriteParams.Shard.GetProject();
- builder << "&cluster=" << WriteParams.Shard.GetCluster();
- builder << "&service=" << WriteParams.Shard.GetService();
- break;
- }
- case NSo::NProto::ESolomonClusterType::CT_MONITORING: {
- builder << "/monitoring/v2/data/write";
- builder << "?folderId=" << WriteParams.Shard.GetCluster();
- builder << "&service=" << WriteParams.Shard.GetService();
- break;
- }
- default:
- Y_ENSURE(false, "Invalid cluster type " << ToString<ui32>(WriteParams.Shard.GetClusterType()));
- }
-
- return builder;
+ return GetSolomonUrl(WriteParams.Shard.GetEndpoint(),
+ WriteParams.Shard.GetUseSsl(),
+ WriteParams.Shard.GetProject(),
+ WriteParams.Shard.GetCluster(),
+ WriteParams.Shard.GetService(),
+ WriteParams.Shard.GetClusterType());
}
void PushMetricsToBuffer(ui64& metricsCount) {
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h
index de6df23756..aa5469f10b 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h
@@ -29,4 +29,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolo
void RegisterDQSolomonWriteActorFactory(TDqSinkFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
+TString GetSolomonUrl(const TString& endpoint, bool useSsl, const TString& project, const TString& cluster, const TString& service, const ::NYql::NSo::NProto::ESolomonClusterType& type);
+
} // namespace NYql::NDq