diff options
author | andrewproni <andrewproni@yandex-team.com> | 2023-09-02 20:46:37 +0300 |
---|---|---|
committer | andrewproni <andrewproni@yandex-team.com> | 2023-09-02 21:00:28 +0300 |
commit | e2fd1720f4046c30ddc537b66392804c7f77dc60 (patch) | |
tree | 3868ab307ca91692610bdda46ccbf567d7a7f482 | |
parent | f3351138d25ba9b9c86194ca826e0cb1257aff89 (diff) | |
download | ydb-e2fd1720f4046c30ddc537b66392804c7f77dc60.tar.gz |
Using S3GatewayConfig in KqpHost
-rw-r--r-- | ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/federated_query/kqp_federated_query_helpers.h | 15 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/federated_query/common/common.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp | 2 |
6 files changed, 20 insertions, 22 deletions
diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp index 345fc8db6cd..665c036f3f7 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp +++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp @@ -41,6 +41,8 @@ namespace NKikimr::NKqp { HttpGatewayConfig = queryServiceConfig.HasHttpGateway() ? queryServiceConfig.GetHttpGateway() : DefaultHttpGatewayConfig(); HttpGateway = NYql::IHTTPGateway::Make(&HttpGatewayConfig); + S3GatewayConfig = queryServiceConfig.GetS3(); + // Initialize Token Accessor if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) { const auto& tokenAccessorConfig = appConfig.GetAuthConfig().GetTokenAccessorConfig(); @@ -93,11 +95,12 @@ namespace NKikimr::NKqp { HttpGateway, ConnectorClient, CredentialsFactory, - nullptr}; + nullptr, + S3GatewayConfig}; // Init DatabaseAsyncResolver only if all requirements are met if (DatabaseResolverActorId && MdbGateway && MdbEndpointGenerator) { - result.DatabaseAsyncResovler = std::make_shared<NFq::TDatabaseAsyncResolverImpl>( + result.DatabaseAsyncResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>( actorSystem, DatabaseResolverActorId.value(), "", // TODO: use YDB Gateway endpoint? diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h index 67b1eaecfa7..40839709e22 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h +++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h @@ -15,7 +15,8 @@ namespace NKikimr::NKqp { NYql::IHTTPGateway::TPtr HttpGateway; NYql::NConnector::IClient::TPtr ConnectorClient; NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; - NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResovler; + NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver; + NYql::TS3GatewayConfig S3GatewayConfig; }; struct IKqpFederatedQuerySetupFactory { @@ -43,6 +44,7 @@ namespace NKikimr::NKqp { private: NYql::THttpGatewayConfig HttpGatewayConfig; NYql::IHTTPGateway::TPtr HttpGateway; + NYql::TS3GatewayConfig S3GatewayConfig; NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; NYql::NConnector::IClient::TPtr ConnectorClient; std::optional<NActors::TActorId> DatabaseResolverActorId; @@ -57,24 +59,27 @@ namespace NKikimr::NKqp { NYql::IHTTPGateway::TPtr httpGateway, NYql::NConnector::IClient::TPtr connectorClient, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResovler) + NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver, + const NYql::TS3GatewayConfig& s3GatewayConfig) : HttpGateway(httpGateway) , ConnectorClient(connectorClient) , CredentialsFactory(credentialsFactory) - , DatabaseAsyncResovler(databaseAsyncResovler) + , DatabaseAsyncResolver(databaseAsyncResolver) + , S3GatewayConfig(s3GatewayConfig) { } std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem*) override { return TKqpFederatedQuerySetup{ - HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResovler}; + HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig}; } private: NYql::IHTTPGateway::TPtr HttpGateway; NYql::NConnector::IClient::TPtr ConnectorClient; NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; - NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResovler; + NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver; + NYql::TS3GatewayConfig S3GatewayConfig; }; IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory( diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 2af39a3c389..376051c42c9 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1498,17 +1498,7 @@ private: state->FunctionRegistry = FuncRegistry; state->CredentialsFactory = FederatedQuerySetup->CredentialsFactory; - // - // TODO: Use TS3GatewayConfig from Kikimr Config when added - // - NYql::TS3GatewayConfig cfg; - cfg.SetMaxReadSizePerQuery(100_GB); - { - auto& setting = *cfg.AddDefaultSettings(); - setting.SetName("UseBlocksSource"); - setting.SetValue("true"); - } - state->Configuration->Init(cfg, TypesCtx); + state->Configuration->Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx); auto dataSource = NYql::CreateS3DataSource(state, FederatedQuerySetup->HttpGateway); auto dataSink = NYql::CreateS3DataSink(state, FederatedQuerySetup->HttpGateway); @@ -1525,7 +1515,7 @@ private: auto state = MakeIntrusive<NYql::TGenericState>( TypesCtx.Get(), FuncRegistry, - FederatedQuerySetup->DatabaseAsyncResovler, + FederatedQuerySetup->DatabaseAsyncResolver, FederatedQuerySetup->ConnectorClient, nullptr ); diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp index a856f199a92..3c88d82d783 100644 --- a/ydb/core/kqp/node_service/kqp_node_ut.cpp +++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp @@ -184,7 +184,7 @@ public: Runtime->EnableScheduleForActor(ResourceManagerActorId, true); WaitForBootstrap(); - auto FederatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr}); + auto FederatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}}); auto asyncIoFactory = CreateKqpAsyncIoFactory(KqpCounters, FederatedQuerySetup); auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), asyncIoFactory); KqpNodeActorId = Runtime->Register(kqpNode); diff --git a/ydb/core/kqp/ut/federated_query/common/common.cpp b/ydb/core/kqp/ut/federated_query/common/common.cpp index 1c8faf78925..c221217f728 100644 --- a/ydb/core/kqp/ut/federated_query/common/common.cpp +++ b/ydb/core/kqp/ut/federated_query/common/common.cpp @@ -27,7 +27,7 @@ namespace NKikimr::NKqp::NFederatedQueryTest { featureFlags.SetEnableScriptExecutionOperations(true); auto federatedQuerySetupFactory = std::make_shared<TKqpFederatedQuerySetupFactoryMock>( - httpGateway, connectorClient, nullptr, nullptr + httpGateway, connectorClient, nullptr, nullptr, appConfig ? appConfig->GetQueryServiceConfig().GetS3() : NYql::TS3GatewayConfig() ); auto settings = TKikimrSettings() diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 777eec3497c..17a74e42e8f 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -52,7 +52,7 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga UNIT_ASSERT(TryParseFromTextFormat(defaultSettingsStream, defaultSettings)); kikimrConfig->Init(defaultSettings.GetDefaultSettings(), cluster, settings, true); - auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr}); + auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}}); return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver, federatedQuerySetup, funcRegistry, funcRegistry, keepConfigChanges); } |