aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEgor Zudin <e-zudin@ydb.tech>2023-12-22 15:44:53 +0300
committerGitHub <noreply@github.com>2023-12-22 15:44:53 +0300
commitecc3d949d899594ac30c3d99ba30d6e7e264e751 (patch)
tree8c2a2d30d9ddb292900ada25e539dcbd0507a441
parentabb887b68717defa590b2c21b1d314e644e1c4d0 (diff)
downloadydb-ecc3d949d899594ac30c3d99ba30d6e7e264e751.tar.gz
YQ-2628: add connections option for streaming queries (#579)
-rw-r--r--ydb/core/fq/libs/config/protos/control_plane_storage.proto1
-rw-r--r--ydb/core/fq/libs/control_plane_storage/config.cpp4
-rw-r--r--ydb/core/fq/libs/control_plane_storage/config.h1
-rw-r--r--ydb/core/fq/libs/control_plane_storage/util.cpp5
-rw-r--r--ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp7
5 files changed, 17 insertions, 1 deletions
diff --git a/ydb/core/fq/libs/config/protos/control_plane_storage.proto b/ydb/core/fq/libs/config/protos/control_plane_storage.proto
index fa8ed94fec..1c4b668d4c 100644
--- a/ydb/core/fq/libs/config/protos/control_plane_storage.proto
+++ b/ydb/core/fq/libs/config/protos/control_plane_storage.proto
@@ -73,4 +73,5 @@ message TControlPlaneStorageConfig {
bool DumpRawStatistics = 32;
bool IgnorePrivateSources = 33;
Ydb.Query.StatsMode StatsMode = 34;
+ repeated string AvailableStreamingConnection = 35;
}
diff --git a/ydb/core/fq/libs/control_plane_storage/config.cpp b/ydb/core/fq/libs/control_plane_storage/config.cpp
index 9661bdedf1..2cd4dd6bbe 100644
--- a/ydb/core/fq/libs/control_plane_storage/config.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/config.cpp
@@ -40,6 +40,10 @@ TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPl
AvailableBindings.insert(GetBindingType(availableBinding));
}
+ for (const auto& availableConnection : Proto.GetAvailableStreamingConnection()) {
+ AvailableStreamingConnections.insert(GetConnectionType(availableConnection));
+ }
+
GeneratorPathsLimit =
s3Config.HasGeneratorPathsLimit() ? s3Config.GetGeneratorPathsLimit() : 50'000;
diff --git a/ydb/core/fq/libs/control_plane_storage/config.h b/ydb/core/fq/libs/control_plane_storage/config.h
index fc4fcf4893..16c8b96105 100644
--- a/ydb/core/fq/libs/control_plane_storage/config.h
+++ b/ydb/core/fq/libs/control_plane_storage/config.h
@@ -30,6 +30,7 @@ struct TControlPlaneStorageConfig {
TRetryPolicyItem TaskLeaseRetryPolicy;
TDuration QuotaTtl;
TDuration MetricsTtl;
+ TSet<FederatedQuery::ConnectionSetting::ConnectionCase> AvailableStreamingConnections;
TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, const NConfig::TComputeConfig& computeConfigProto);
};
diff --git a/ydb/core/fq/libs/control_plane_storage/util.cpp b/ydb/core/fq/libs/control_plane_storage/util.cpp
index c31aa8e20b..74c5bd8664 100644
--- a/ydb/core/fq/libs/control_plane_storage/util.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/util.cpp
@@ -161,6 +161,11 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane
config.SetResultSetsTtl("1d");
}
+ if (config.AvailableStreamingConnectionSize() == 0) {
+ // For backward compatibility, TODO: YQ-2628, remove after config update on every cluster
+ config.MutableAvailableStreamingConnection()->CopyFrom(config.GetAvailableConnection());
+ }
+
return config;
}
diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
index 19bdf53601..7ab56126f1 100644
--- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
@@ -219,7 +219,12 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
*queryInternal.mutable_compute_connection() = computeDatabase.connection();
TSet<TString> disabledConnections;
for (const auto& connection: GetEntities<FederatedQuery::Connection>(resultSets[resultSets.size() - 2], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources(), commonCounters)) {
- if (!Config->AvailableConnections.contains(connection.content().setting().connection_case())) {
+ auto connectionCase = connection.content().setting().connection_case();
+ if (!Config->AvailableConnections.contains(connectionCase)) {
+ disabledConnections.insert(connection.meta().id());
+ continue;
+ }
+ if ((queryType == FederatedQuery::QueryContent::STREAMING) && !Config->AvailableStreamingConnections.contains(connectionCase)) {
disabledConnections.insert(connection.meta().id());
continue;
}