diff options
author | Egor Zudin <e-zudin@ydb.tech> | 2023-12-22 15:44:53 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-22 15:44:53 +0300 |
commit | ecc3d949d899594ac30c3d99ba30d6e7e264e751 (patch) | |
tree | 8c2a2d30d9ddb292900ada25e539dcbd0507a441 | |
parent | abb887b68717defa590b2c21b1d314e644e1c4d0 (diff) | |
download | ydb-ecc3d949d899594ac30c3d99ba30d6e7e264e751.tar.gz |
YQ-2628: add connections option for streaming queries (#579)
5 files changed, 17 insertions, 1 deletions
diff --git a/ydb/core/fq/libs/config/protos/control_plane_storage.proto b/ydb/core/fq/libs/config/protos/control_plane_storage.proto index fa8ed94fec..1c4b668d4c 100644 --- a/ydb/core/fq/libs/config/protos/control_plane_storage.proto +++ b/ydb/core/fq/libs/config/protos/control_plane_storage.proto @@ -73,4 +73,5 @@ message TControlPlaneStorageConfig { bool DumpRawStatistics = 32; bool IgnorePrivateSources = 33; Ydb.Query.StatsMode StatsMode = 34; + repeated string AvailableStreamingConnection = 35; } diff --git a/ydb/core/fq/libs/control_plane_storage/config.cpp b/ydb/core/fq/libs/control_plane_storage/config.cpp index 9661bdedf1..2cd4dd6bbe 100644 --- a/ydb/core/fq/libs/control_plane_storage/config.cpp +++ b/ydb/core/fq/libs/control_plane_storage/config.cpp @@ -40,6 +40,10 @@ TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPl AvailableBindings.insert(GetBindingType(availableBinding)); } + for (const auto& availableConnection : Proto.GetAvailableStreamingConnection()) { + AvailableStreamingConnections.insert(GetConnectionType(availableConnection)); + } + GeneratorPathsLimit = s3Config.HasGeneratorPathsLimit() ? s3Config.GetGeneratorPathsLimit() : 50'000; diff --git a/ydb/core/fq/libs/control_plane_storage/config.h b/ydb/core/fq/libs/control_plane_storage/config.h index fc4fcf4893..16c8b96105 100644 --- a/ydb/core/fq/libs/control_plane_storage/config.h +++ b/ydb/core/fq/libs/control_plane_storage/config.h @@ -30,6 +30,7 @@ struct TControlPlaneStorageConfig { TRetryPolicyItem TaskLeaseRetryPolicy; TDuration QuotaTtl; TDuration MetricsTtl; + TSet<FederatedQuery::ConnectionSetting::ConnectionCase> AvailableStreamingConnections; TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, const NConfig::TComputeConfig& computeConfigProto); }; diff --git a/ydb/core/fq/libs/control_plane_storage/util.cpp b/ydb/core/fq/libs/control_plane_storage/util.cpp index c31aa8e20b..74c5bd8664 100644 --- a/ydb/core/fq/libs/control_plane_storage/util.cpp +++ b/ydb/core/fq/libs/control_plane_storage/util.cpp @@ -161,6 +161,11 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane config.SetResultSetsTtl("1d"); } + if (config.AvailableStreamingConnectionSize() == 0) { + // For backward compatibility, TODO: YQ-2628, remove after config update on every cluster + config.MutableAvailableStreamingConnection()->CopyFrom(config.GetAvailableConnection()); + } + return config; } diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index 19bdf53601..7ab56126f1 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -219,7 +219,12 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery *queryInternal.mutable_compute_connection() = computeDatabase.connection(); TSet<TString> disabledConnections; for (const auto& connection: GetEntities<FederatedQuery::Connection>(resultSets[resultSets.size() - 2], CONNECTION_COLUMN_NAME, Config->Proto.GetIgnorePrivateSources(), commonCounters)) { - if (!Config->AvailableConnections.contains(connection.content().setting().connection_case())) { + auto connectionCase = connection.content().setting().connection_case(); + if (!Config->AvailableConnections.contains(connectionCase)) { + disabledConnections.insert(connection.meta().id()); + continue; + } + if ((queryType == FederatedQuery::QueryContent::STREAMING) && !Config->AvailableStreamingConnections.contains(connectionCase)) { disabledConnections.insert(connection.meta().id()); continue; } |