aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorandrewproni <andrewproni@yandex-team.com>2023-09-02 20:46:37 +0300
committerandrewproni <andrewproni@yandex-team.com>2023-09-02 21:00:28 +0300
commite2fd1720f4046c30ddc537b66392804c7f77dc60 (patch)
tree3868ab307ca91692610bdda46ccbf567d7a7f482
parentf3351138d25ba9b9c86194ca826e0cb1257aff89 (diff)
downloadydb-e2fd1720f4046c30ddc537b66392804c7f77dc60.tar.gz
Using S3GatewayConfig in KqpHost
-rw-r--r--ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp7
-rw-r--r--ydb/core/kqp/federated_query/kqp_federated_query_helpers.h15
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp14
-rw-r--r--ydb/core/kqp/node_service/kqp_node_ut.cpp2
-rw-r--r--ydb/core/kqp/ut/federated_query/common/common.cpp2
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp2
6 files changed, 20 insertions, 22 deletions
diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
index 345fc8db6cd..665c036f3f7 100644
--- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
+++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
@@ -41,6 +41,8 @@ namespace NKikimr::NKqp {
HttpGatewayConfig = queryServiceConfig.HasHttpGateway() ? queryServiceConfig.GetHttpGateway() : DefaultHttpGatewayConfig();
HttpGateway = NYql::IHTTPGateway::Make(&HttpGatewayConfig);
+ S3GatewayConfig = queryServiceConfig.GetS3();
+
// Initialize Token Accessor
if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) {
const auto& tokenAccessorConfig = appConfig.GetAuthConfig().GetTokenAccessorConfig();
@@ -93,11 +95,12 @@ namespace NKikimr::NKqp {
HttpGateway,
ConnectorClient,
CredentialsFactory,
- nullptr};
+ nullptr,
+ S3GatewayConfig};
// Init DatabaseAsyncResolver only if all requirements are met
if (DatabaseResolverActorId && MdbGateway && MdbEndpointGenerator) {
- result.DatabaseAsyncResovler = std::make_shared<NFq::TDatabaseAsyncResolverImpl>(
+ result.DatabaseAsyncResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>(
actorSystem,
DatabaseResolverActorId.value(),
"", // TODO: use YDB Gateway endpoint?
diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h
index 67b1eaecfa7..40839709e22 100644
--- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h
+++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h
@@ -15,7 +15,8 @@ namespace NKikimr::NKqp {
NYql::IHTTPGateway::TPtr HttpGateway;
NYql::NConnector::IClient::TPtr ConnectorClient;
NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
- NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResovler;
+ NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver;
+ NYql::TS3GatewayConfig S3GatewayConfig;
};
struct IKqpFederatedQuerySetupFactory {
@@ -43,6 +44,7 @@ namespace NKikimr::NKqp {
private:
NYql::THttpGatewayConfig HttpGatewayConfig;
NYql::IHTTPGateway::TPtr HttpGateway;
+ NYql::TS3GatewayConfig S3GatewayConfig;
NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
NYql::NConnector::IClient::TPtr ConnectorClient;
std::optional<NActors::TActorId> DatabaseResolverActorId;
@@ -57,24 +59,27 @@ namespace NKikimr::NKqp {
NYql::IHTTPGateway::TPtr httpGateway,
NYql::NConnector::IClient::TPtr connectorClient,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResovler)
+ NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver,
+ const NYql::TS3GatewayConfig& s3GatewayConfig)
: HttpGateway(httpGateway)
, ConnectorClient(connectorClient)
, CredentialsFactory(credentialsFactory)
- , DatabaseAsyncResovler(databaseAsyncResovler)
+ , DatabaseAsyncResolver(databaseAsyncResolver)
+ , S3GatewayConfig(s3GatewayConfig)
{
}
std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem*) override {
return TKqpFederatedQuerySetup{
- HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResovler};
+ HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig};
}
private:
NYql::IHTTPGateway::TPtr HttpGateway;
NYql::NConnector::IClient::TPtr ConnectorClient;
NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
- NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResovler;
+ NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver;
+ NYql::TS3GatewayConfig S3GatewayConfig;
};
IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 2af39a3c389..376051c42c9 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -1498,17 +1498,7 @@ private:
state->FunctionRegistry = FuncRegistry;
state->CredentialsFactory = FederatedQuerySetup->CredentialsFactory;
- //
- // TODO: Use TS3GatewayConfig from Kikimr Config when added
- //
- NYql::TS3GatewayConfig cfg;
- cfg.SetMaxReadSizePerQuery(100_GB);
- {
- auto& setting = *cfg.AddDefaultSettings();
- setting.SetName("UseBlocksSource");
- setting.SetValue("true");
- }
- state->Configuration->Init(cfg, TypesCtx);
+ state->Configuration->Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx);
auto dataSource = NYql::CreateS3DataSource(state, FederatedQuerySetup->HttpGateway);
auto dataSink = NYql::CreateS3DataSink(state, FederatedQuerySetup->HttpGateway);
@@ -1525,7 +1515,7 @@ private:
auto state = MakeIntrusive<NYql::TGenericState>(
TypesCtx.Get(),
FuncRegistry,
- FederatedQuerySetup->DatabaseAsyncResovler,
+ FederatedQuerySetup->DatabaseAsyncResolver,
FederatedQuerySetup->ConnectorClient,
nullptr
);
diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp
index a856f199a92..3c88d82d783 100644
--- a/ydb/core/kqp/node_service/kqp_node_ut.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp
@@ -184,7 +184,7 @@ public:
Runtime->EnableScheduleForActor(ResourceManagerActorId, true);
WaitForBootstrap();
- auto FederatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr});
+ auto FederatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}});
auto asyncIoFactory = CreateKqpAsyncIoFactory(KqpCounters, FederatedQuerySetup);
auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), asyncIoFactory);
KqpNodeActorId = Runtime->Register(kqpNode);
diff --git a/ydb/core/kqp/ut/federated_query/common/common.cpp b/ydb/core/kqp/ut/federated_query/common/common.cpp
index 1c8faf78925..c221217f728 100644
--- a/ydb/core/kqp/ut/federated_query/common/common.cpp
+++ b/ydb/core/kqp/ut/federated_query/common/common.cpp
@@ -27,7 +27,7 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
featureFlags.SetEnableScriptExecutionOperations(true);
auto federatedQuerySetupFactory = std::make_shared<TKqpFederatedQuerySetupFactoryMock>(
- httpGateway, connectorClient, nullptr, nullptr
+ httpGateway, connectorClient, nullptr, nullptr, appConfig ? appConfig->GetQueryServiceConfig().GetS3() : NYql::TS3GatewayConfig()
);
auto settings = TKikimrSettings()
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
index 777eec3497c..17a74e42e8f 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
@@ -52,7 +52,7 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga
UNIT_ASSERT(TryParseFromTextFormat(defaultSettingsStream, defaultSettings));
kikimrConfig->Init(defaultSettings.GetDefaultSettings(), cluster, settings, true);
- auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr});
+ auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}});
return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver,
federatedQuerySetup, funcRegistry, funcRegistry, keepConfigChanges);
}