aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@yandex-team.com>2023-09-01 12:24:13 +0300
committervitalyisaev <vitalyisaev@yandex-team.com>2023-09-01 12:47:31 +0300
commit41fb59788d7929ed2ef5a9382669414db05ed34c (patch)
tree3f3038f01642a200609a6f457dd949b769735d27
parentcc871496ef02852bf6ca96470fa61c253940ddf4 (diff)
downloadydb-41fb59788d7929ed2ef5a9382669414db05ed34c.tar.gz
YQ Connector: initialize Federated Query clients and actors in KQP
1. Появилась папка `ydb/core/kqp/federated_query`, где инкапсулировано всё необходимое для запуска Federated Query в KQP - HTTP и GRPC клиенты, там же создаются служебные акторы, такие как `DatabaseResolver` 2. Переходим на использование `TQueryServiceConfig`. Туда же перенесена ещё пара параметров из `FederatedQueryConfig`. 3. Юнит-тесты на S3 отрефакторены, выделена общая часть, которая будет использоваться в юнит-тестах на Generic провайдер.
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp6
-rw-r--r--ydb/core/driver_lib/run/ya.make1
-rw-r--r--ydb/core/fq/libs/actors/database_resolver.cpp5
-rw-r--r--ydb/core/fq/libs/actors/database_resolver.h1
-rw-r--r--ydb/core/fq/libs/actors/run_actor.cpp3
-rw-r--r--ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/compute/common/run_actor_params.cpp3
-rw-r--r--ydb/core/fq/libs/compute/common/run_actor_params.h1
-rw-r--r--ydb/core/fq/libs/compute/common/ya.make1
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp7
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h5
-rw-r--r--ydb/core/fq/libs/test_connection/test_connection.cpp4
-rw-r--r--ydb/core/fq/libs/test_connection/test_connection.h2
-rw-r--r--ydb/core/kqp/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt4
-rw-r--r--ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt4
-rw-r--r--ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt4
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp33
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.cpp21
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.h13
-rw-r--r--ydb/core/kqp/compile_service/ya.make4
-rw-r--r--ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.cpp18
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h11
-rw-r--r--ydb/core/kqp/compute_actor/ya.make3
-rw-r--r--ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt26
-rw-r--r--ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt27
-rw-r--r--ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt27
-rw-r--r--ydb/core/kqp/federated_query/CMakeLists.txt17
-rw-r--r--ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt26
-rw-r--r--ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp122
-rw-r--r--ydb/core/kqp/federated_query/kqp_federated_query_helpers.h84
-rw-r--r--ydb/core/kqp/federated_query/ya.make18
-rw-r--r--ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/kqp/host/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/kqp/host/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/kqp/host/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp47
-rw-r--r--ydb/core/kqp/host/kqp_host.h5
-rw-r--r--ydb/core/kqp/host/ya.make2
-rw-r--r--ydb/core/kqp/node_service/kqp_node_ut.cpp4
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp67
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.h7
-rw-r--r--ydb/core/kqp/proxy_service/ya.make1
-rw-r--r--ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp21
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.h12
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_actor.cpp23
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_common.h10
-rw-r--r--ydb/core/kqp/session_actor/ya.make2
-rw-r--r--ydb/core/kqp/ut/common/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/ut/common/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/ut/common/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp4
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h8
-rw-r--r--ydb/core/kqp/ut/common/ya.make1
-rw-r--r--ydb/core/kqp/ut/federated_query/CMakeLists.txt11
-rw-r--r--ydb/core/kqp/ut/federated_query/common/CMakeLists.darwin-x86_64.txt23
-rw-r--r--ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-aarch64.txt24
-rw-r--r--ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-x86_64.txt24
-rw-r--r--ydb/core/kqp/ut/federated_query/common/CMakeLists.txt17
-rw-r--r--ydb/core/kqp/ut/federated_query/common/CMakeLists.windows-x86_64.txt23
-rw-r--r--ydb/core/kqp/ut/federated_query/common/common.cpp44
-rw-r--r--ydb/core/kqp/ut/federated_query/common/common.h19
-rw-r--r--ydb/core/kqp/ut/federated_query/common/ya.make15
-rwxr-xr-xydb/core/kqp/ut/federated_query/run_testpack28
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/CMakeLists.darwin-x86_64.txt (renamed from ydb/core/kqp/ut/federated_query/CMakeLists.darwin-x86_64.txt)32
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/CMakeLists.linux-aarch64.txt (renamed from ydb/core/kqp/ut/federated_query/CMakeLists.linux-aarch64.txt)32
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/CMakeLists.linux-x86_64.txt (renamed from ydb/core/kqp/ut/federated_query/CMakeLists.linux-x86_64.txt)32
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/CMakeLists.txt17
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/CMakeLists.windows-x86_64.txt (renamed from ydb/core/kqp/ut/federated_query/CMakeLists.windows-x86_64.txt)30
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp (renamed from ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp)150
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/ya.make28
-rw-r--r--ydb/core/kqp/ut/federated_query/ya.make29
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp5
-rw-r--r--ydb/core/kqp/ya.make1
-rw-r--r--ydb/core/protos/config.proto2
-rw-r--r--ydb/core/testlib/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/testlib/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/testlib/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/testlib/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/testlib/test_client.cpp4
-rw-r--r--ydb/core/testlib/test_client.h3
-rw-r--r--ydb/core/testlib/ya.make1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp4
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h2
108 files changed, 970 insertions, 401 deletions
diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
index 7fe9caae62a..44e811c14f9 100644
--- a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
@@ -68,6 +68,7 @@ target_link_libraries(run PUBLIC
ydb-core-keyvalue
ydb-core-kafka_proxy
ydb-core-kqp
+ core-kqp-federated_query
core-kqp-rm_service
ydb-core-load_test
ydb-core-local_pgwire
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
index 5d41e085cfa..86b089d2ee7 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
@@ -69,6 +69,7 @@ target_link_libraries(run PUBLIC
ydb-core-keyvalue
ydb-core-kafka_proxy
ydb-core-kqp
+ core-kqp-federated_query
core-kqp-rm_service
ydb-core-load_test
ydb-core-local_pgwire
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
index 5d41e085cfa..86b089d2ee7 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
@@ -69,6 +69,7 @@ target_link_libraries(run PUBLIC
ydb-core-keyvalue
ydb-core-kafka_proxy
ydb-core-kqp
+ core-kqp-federated_query
core-kqp-rm_service
ydb-core-load_test
ydb-core-local_pgwire
diff --git a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
index 7fe9caae62a..44e811c14f9 100644
--- a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
@@ -68,6 +68,7 @@ target_link_libraries(run PUBLIC
ydb-core-keyvalue
ydb-core-kafka_proxy
ydb-core-kqp
+ core-kqp-federated_query
core-kqp-rm_service
ydb-core-load_test
ydb-core-local_pgwire
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 44af92665df..10383b4a19d 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -2105,8 +2105,10 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
GlobalObjects.AddGlobalObject(std::make_shared<NYql::NLog::YqlLoggerScope>(
new NYql::NLog::TTlsLogBackend(new TNullLogBackend())));
- auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(), Config.GetAuthConfig().GetTokenAccessorConfig(),
- Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources));
+ auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(),
+ Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources),
+ NKqp::MakeKqpFederatedQuerySetupFactory(setup, appData, Config)
+ );
setup->LocalServices.push_back(std::make_pair(
NKqp::MakeKqpProxyID(NodeId),
TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId)));
diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make
index 4635d10cfce..14240062100 100644
--- a/ydb/core/driver_lib/run/ya.make
+++ b/ydb/core/driver_lib/run/ya.make
@@ -83,6 +83,7 @@ PEERDIR(
ydb/core/keyvalue
ydb/core/kafka_proxy
ydb/core/kqp
+ ydb/core/kqp/federated_query
ydb/core/kqp/rm_service
ydb/core/load_test
ydb/core/local_pgwire
diff --git a/ydb/core/fq/libs/actors/database_resolver.cpp b/ydb/core/fq/libs/actors/database_resolver.cpp
index 9591ff9b51d..0ebbdc0e653 100644
--- a/ydb/core/fq/libs/actors/database_resolver.cpp
+++ b/ydb/core/fq/libs/actors/database_resolver.cpp
@@ -261,7 +261,6 @@ public:
TString endpoint;
TVector<TString> aliveHosts;
for (const auto& host : databaseInfo.GetMap().at("hosts").GetArraySafe()) {
-
// all host services must be alive
bool alive = true;
for (const auto& service: host.GetMap().at("services").GetArraySafe()) {
@@ -397,4 +396,8 @@ NActors::IActor* CreateDatabaseResolver(NActors::TActorId httpProxy, ISecuredSer
return new TDatabaseResolver(httpProxy, credentialsFactory);
}
+NActors::TActorId MakeDatabaseResolverActorId() {
+ return NActors::TActorId(0, "DBRESOLVER");
+}
+
} /* namespace NFq */
diff --git a/ydb/core/fq/libs/actors/database_resolver.h b/ydb/core/fq/libs/actors/database_resolver.h
index 21035416820..f7ebd421a19 100644
--- a/ydb/core/fq/libs/actors/database_resolver.h
+++ b/ydb/core/fq/libs/actors/database_resolver.h
@@ -7,5 +7,6 @@
namespace NFq {
NActors::IActor* CreateDatabaseResolver(NActors::TActorId httpProxy, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
+NActors::TActorId MakeDatabaseResolverActorId();
} /* namespace NFq */
diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp
index 7fea6502a66..689d17c3da7 100644
--- a/ydb/core/fq/libs/actors/run_actor.cpp
+++ b/ydb/core/fq/libs/actors/run_actor.cpp
@@ -59,7 +59,6 @@
#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
#include <ydb/core/fq/libs/control_plane_storage/util.h>
#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
-#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
#include <ydb/core/fq/libs/gateway/empty_gateway.h>
#include <ydb/core/fq/libs/private_client/events.h>
#include <ydb/core/fq/libs/private_client/private_client.h>
@@ -1832,7 +1831,7 @@ private:
Params.DatabaseResolver,
Params.Config.GetCommon().GetYdbMvpCloudEndpoint(),
Params.Config.GetCommon().GetMdbGateway(),
- NFq::MakeMdbEndpointGeneratorGeneric(Params.Config.GetCommon().GetMdbTransformHost()),
+ Params.MdbEndpointGenerator,
Params.QueryId);
{
// TBD: move init to better place
diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt
index df13fd872f3..f559b414c0e 100644
--- a/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt
@@ -16,6 +16,7 @@ target_link_libraries(libs-compute-common PUBLIC
contrib-libs-cxxsupp
yutil
libs-config-protos
+ fq-libs-db_id_async_resolver_impl
fq-libs-grpc
fq-libs-shared_resources
providers-dq-provider
diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt
index db45ab0353d..387af424f08 100644
--- a/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt
@@ -17,6 +17,7 @@ target_link_libraries(libs-compute-common PUBLIC
contrib-libs-cxxsupp
yutil
libs-config-protos
+ fq-libs-db_id_async_resolver_impl
fq-libs-grpc
fq-libs-shared_resources
providers-dq-provider
diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt
index db45ab0353d..387af424f08 100644
--- a/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt
@@ -17,6 +17,7 @@ target_link_libraries(libs-compute-common PUBLIC
contrib-libs-cxxsupp
yutil
libs-config-protos
+ fq-libs-db_id_async_resolver_impl
fq-libs-grpc
fq-libs-shared_resources
providers-dq-provider
diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt
index df13fd872f3..f559b414c0e 100644
--- a/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt
@@ -16,6 +16,7 @@ target_link_libraries(libs-compute-common PUBLIC
contrib-libs-cxxsupp
yutil
libs-config-protos
+ fq-libs-db_id_async_resolver_impl
fq-libs-grpc
fq-libs-shared_resources
providers-dq-provider
diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.cpp b/ydb/core/fq/libs/compute/common/run_actor_params.cpp
index b621f895c81..3bd287188fe 100644
--- a/ydb/core/fq/libs/compute/common/run_actor_params.cpp
+++ b/ydb/core/fq/libs/compute/common/run_actor_params.cpp
@@ -1,5 +1,7 @@
#include "run_actor_params.h"
+#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
+
namespace NFq {
using namespace NActors;
@@ -71,6 +73,7 @@ TRunActorParams::TRunActorParams(
, Scope(scope)
, AuthToken(authToken)
, DatabaseResolver(databaseResolver)
+ , MdbEndpointGenerator(NFq::MakeMdbEndpointGeneratorGeneric(config.GetCommon().GetMdbTransformHost()))
, QueryId(queryId)
, UserId(userId)
, Owner(owner)
diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.h b/ydb/core/fq/libs/compute/common/run_actor_params.h
index f3c1f54dbe1..8a5a7913f7d 100644
--- a/ydb/core/fq/libs/compute/common/run_actor_params.h
+++ b/ydb/core/fq/libs/compute/common/run_actor_params.h
@@ -98,6 +98,7 @@ struct TRunActorParams { // TODO2 : Change name
const NYdb::NFq::TScope Scope;
const TString AuthToken;
const NActors::TActorId DatabaseResolver;
+ const NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
const TString QueryId;
const TString UserId;
const TString Owner;
diff --git a/ydb/core/fq/libs/compute/common/ya.make b/ydb/core/fq/libs/compute/common/ya.make
index a00549a2742..ebb41b1066a 100644
--- a/ydb/core/fq/libs/compute/common/ya.make
+++ b/ydb/core/fq/libs/compute/common/ya.make
@@ -8,6 +8,7 @@ SRCS(
PEERDIR(
ydb/core/fq/libs/config/protos
+ ydb/core/fq/libs/db_id_async_resolver_impl
ydb/core/fq/libs/grpc
ydb/core/fq/libs/shared_resources
ydb/library/yql/providers/dq/provider
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
index 97b9f1f40ab..814040847d3 100644
--- a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
@@ -1,6 +1,7 @@
#include "db_async_resolver_impl.h"
#include <library/cpp/actors/core/actorsystem.h>
+#include <ydb/core/fq/libs/events/events.h>
namespace NFq {
using namespace NThreading;
@@ -10,13 +11,13 @@ TDatabaseAsyncResolverImpl::TDatabaseAsyncResolverImpl(
const NActors::TActorId& recipient,
const TString& ydbMvpEndpoint,
const TString& mdbGateway,
- NYql::IMdbEndpointGenerator::TPtr&& mdbEndpointGenerator,
+ const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator,
const TString& traceId)
: ActorSystem(actorSystem)
, Recipient(recipient)
, YdbMvpEndpoint(ydbMvpEndpoint)
, MdbGateway(mdbGateway)
- , mdbEndpointGenerator(std::move(mdbEndpointGenerator))
+ , MdbEndpointGenerator(mdbEndpointGenerator)
, TraceId(traceId)
{
}
@@ -45,7 +46,7 @@ TFuture<NYql::TDatabaseResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(
ActorSystem->Send(new NActors::IEventHandle(Recipient, callbackId,
new TEvents::TEvEndpointRequest(ids, YdbMvpEndpoint, MdbGateway,
- TraceId, mdbEndpointGenerator)));
+ TraceId, MdbEndpointGenerator)));
return promise.GetFuture();
}
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h
index 9e86ab16c0b..f22f4e764a2 100644
--- a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h
@@ -1,6 +1,5 @@
#pragma once
-#include <ydb/core/fq/libs/events/events.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_endpoint_generator.h>
#include <ydb/library/yql/providers/dq/actors/actor_helpers.h>
@@ -14,7 +13,7 @@ public:
const NActors::TActorId& recipient,
const TString& ydbMvpEndpoint,
const TString& mdbGateway,
- NYql::IMdbEndpointGenerator::TPtr&& endpointGenerator,
+ const NYql::IMdbEndpointGenerator::TPtr& endpointGenerator,
const TString& traceId = ""
);
@@ -24,7 +23,7 @@ private:
const NActors::TActorId Recipient;
const TString YdbMvpEndpoint;
const TString MdbGateway;
- NYql::IMdbEndpointGenerator::TPtr mdbEndpointGenerator;
+ const NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
const TString TraceId;
};
diff --git a/ydb/core/fq/libs/test_connection/test_connection.cpp b/ydb/core/fq/libs/test_connection/test_connection.cpp
index 2d9621b4ada..521442ade45 100644
--- a/ydb/core/fq/libs/test_connection/test_connection.cpp
+++ b/ydb/core/fq/libs/test_connection/test_connection.cpp
@@ -107,6 +107,7 @@ class TTestConnectionActor : public NActors::TActorBootstrapped<TTestConnectionA
TActorId DatabaseResolverActor;
std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver;
NYql::IHTTPGateway::TPtr HttpGateway;
+ NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
public:
TTestConnectionActor(
@@ -131,6 +132,7 @@ public:
, Counters(counters)
, Signer(signer)
, HttpGateway(httpGateway)
+ , MdbEndpointGenerator(NFq::MakeMdbEndpointGeneratorGeneric(commonConfig.GetMdbTransformHost()))
{}
static constexpr char ActorName[] = "YQ_TEST_CONNECTION";
@@ -144,7 +146,7 @@ public:
DbResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>(
NActors::TActivationContext::ActorSystem(), DatabaseResolverActor,
CommonConfig.GetYdbMvpCloudEndpoint(), CommonConfig.GetMdbGateway(),
- NFq::MakeMdbEndpointGeneratorGeneric(CommonConfig.GetMdbTransformHost())
+ MdbEndpointGenerator
);
Become(&TTestConnectionActor::StateFunc);
diff --git a/ydb/core/fq/libs/test_connection/test_connection.h b/ydb/core/fq/libs/test_connection/test_connection.h
index 65517cc48b9..d3fd0c5c907 100644
--- a/ydb/core/fq/libs/test_connection/test_connection.h
+++ b/ydb/core/fq/libs/test_connection/test_connection.h
@@ -7,6 +7,8 @@
#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
#include <ydb/core/fq/libs/signer/signer.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/pq/cm_client/client.h>
#include <ydb/public/api/protos/draft/fq.pb.h>
diff --git a/ydb/core/kqp/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/CMakeLists.darwin-x86_64.txt
index e82ec0d41ac..1bad2f50459 100644
--- a/ydb/core/kqp/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/CMakeLists.darwin-x86_64.txt
@@ -12,6 +12,7 @@ add_subdirectory(compute_actor)
add_subdirectory(counters)
add_subdirectory(executer_actor)
add_subdirectory(expr_nodes)
+add_subdirectory(federated_query)
add_subdirectory(gateway)
add_subdirectory(host)
add_subdirectory(node_service)
diff --git a/ydb/core/kqp/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/CMakeLists.linux-aarch64.txt
index 9e4b855487f..5337a48cda3 100644
--- a/ydb/core/kqp/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/CMakeLists.linux-aarch64.txt
@@ -12,6 +12,7 @@ add_subdirectory(compute_actor)
add_subdirectory(counters)
add_subdirectory(executer_actor)
add_subdirectory(expr_nodes)
+add_subdirectory(federated_query)
add_subdirectory(gateway)
add_subdirectory(host)
add_subdirectory(node_service)
diff --git a/ydb/core/kqp/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/CMakeLists.linux-x86_64.txt
index 9e4b855487f..5337a48cda3 100644
--- a/ydb/core/kqp/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/CMakeLists.linux-x86_64.txt
@@ -12,6 +12,7 @@ add_subdirectory(compute_actor)
add_subdirectory(counters)
add_subdirectory(executer_actor)
add_subdirectory(expr_nodes)
+add_subdirectory(federated_query)
add_subdirectory(gateway)
add_subdirectory(host)
add_subdirectory(node_service)
diff --git a/ydb/core/kqp/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/CMakeLists.windows-x86_64.txt
index e82ec0d41ac..1bad2f50459 100644
--- a/ydb/core/kqp/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/CMakeLists.windows-x86_64.txt
@@ -12,6 +12,7 @@ add_subdirectory(compute_actor)
add_subdirectory(counters)
add_subdirectory(executer_actor)
add_subdirectory(expr_nodes)
+add_subdirectory(federated_query)
add_subdirectory(gateway)
add_subdirectory(host)
add_subdirectory(node_service)
diff --git a/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt
index 0dfabe36a35..1288e5d8438 100644
--- a/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt
@@ -16,9 +16,9 @@ target_link_libraries(core-kqp-compile_service PUBLIC
yutil
ydb-core-actorlib_impl
ydb-core-base
- core-kqp-host
kqp-common-simple
- providers-common-http_gateway
+ core-kqp-federated_query
+ core-kqp-host
)
target_sources(core-kqp-compile_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
diff --git a/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt
index e3c1d4e8e3d..04756092e49 100644
--- a/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt
@@ -17,9 +17,9 @@ target_link_libraries(core-kqp-compile_service PUBLIC
yutil
ydb-core-actorlib_impl
ydb-core-base
- core-kqp-host
kqp-common-simple
- providers-common-http_gateway
+ core-kqp-federated_query
+ core-kqp-host
)
target_sources(core-kqp-compile_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
diff --git a/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt
index e3c1d4e8e3d..04756092e49 100644
--- a/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt
@@ -17,9 +17,9 @@ target_link_libraries(core-kqp-compile_service PUBLIC
yutil
ydb-core-actorlib_impl
ydb-core-base
- core-kqp-host
kqp-common-simple
- providers-common-http_gateway
+ core-kqp-federated_query
+ core-kqp-host
)
target_sources(core-kqp-compile_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
diff --git a/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt
index 0dfabe36a35..1288e5d8438 100644
--- a/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt
@@ -16,9 +16,9 @@ target_link_libraries(core-kqp-compile_service PUBLIC
yutil
ydb-core-actorlib_impl
ydb-core-base
- core-kqp-host
kqp-common-simple
- providers-common-http_gateway
+ core-kqp-federated_query
+ core-kqp-host
)
target_sources(core-kqp-compile_service PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 7de2211aa82..22894aefe6f 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -57,18 +57,19 @@ public:
return NKikimrServices::TActivity::KQP_COMPILE_ACTOR;
}
- TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, const TTableServiceConfig& serviceConfig,
- const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, NYql::IHTTPGateway::TPtr httpGateway,
+ TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
+ const TTableServiceConfig& serviceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& uid, const TKqpQueryId& queryId,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
- TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState)
+ TKqpDbCountersPtr dbCounters,
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
+ NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState)
: Owner(owner)
- , HttpGateway(std::move(httpGateway))
, ModuleResolverState(moduleResolverState)
, Counters(counters)
- , CredentialsFactory(std::move(credentialsFactory))
+ , FederatedQuerySetup(federatedQuerySetup)
, Uid(uid)
, QueryId(queryId)
, QueryRef(QueryId.Text, QueryId.QueryParameterTypes)
@@ -126,7 +127,7 @@ public:
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
- HttpGateway, AppData(ctx)->FunctionRegistry, false, false, CredentialsFactory, std::move(TempTablesState));
+ FederatedQuerySetup, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState));
IKqpHost::TPrepareSettings prepareSettings;
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
@@ -370,10 +371,9 @@ private:
private:
TActorId Owner;
- NYql::IHTTPGateway::TPtr HttpGateway;
TIntrusivePtr<TModuleResolverState> ModuleResolverState;
TIntrusivePtr<TKqpCounters> Counters;
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
+ std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
TString Uid;
TKqpQueryId QueryId;
TKqpQueryRef QueryRef;
@@ -420,16 +420,19 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.PredicateExtract20 = serviceConfig.GetPredicateExtract20();
}
-IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, const TTableServiceConfig& serviceConfig,
- const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, NYql::IHTTPGateway::TPtr httpGateway,
+IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
+ const TTableServiceConfig& serviceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState)
{
- return new TKqpCompileActor(owner, kqpSettings, serviceConfig, metadataProviderConfig, std::move(httpGateway), moduleResolverState,
- counters, std::move(credentialsFactory),
- uid, query, userToken, dbCounters, std::move(traceId), std::move(tempTablesState));
+ return new TKqpCompileActor(owner, kqpSettings, serviceConfig, metadataProviderConfig,
+ moduleResolverState, counters,
+ uid, query, userToken, dbCounters,
+ federatedQuerySetup,
+ std::move(traceId), std::move(tempTablesState));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index 960fdc85151..7bd700c4a86 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -311,8 +311,8 @@ public:
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- NYql::IHTTPGateway::TPtr httpGateway)
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup
+ )
: Config(serviceConfig)
, MetadataProviderConfig(metadataProviderConfig)
, KqpSettings(kqpSettings)
@@ -321,8 +321,7 @@ public:
, QueryCache(Config.GetCompileQueryCacheSize(), TDuration::Seconds(Config.GetCompileQueryCacheTTLSec()))
, RequestsQueue(Config.GetCompileRequestQueueSize())
, QueryReplayFactory(std::move(queryReplayFactory))
- , CredentialsFactory(std::move(credentialsFactory))
- , HttpGateway(std::move(httpGateway))
+ , FederatedQuerySetup(federatedQuerySetup)
{}
void Bootstrap(const TActorContext& ctx) {
@@ -739,8 +738,9 @@ private:
}
void StartCompilation(TKqpCompileRequest&& request, const TActorContext& ctx) {
- auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, Config, MetadataProviderConfig, HttpGateway, ModuleResolverState, Counters,
- CredentialsFactory, request.Uid, request.Query, request.UserToken, request.DbCounters, request.CompileServiceSpan.GetTraceId(), std::move(request.TempTablesState));
+ auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, Config, MetadataProviderConfig, ModuleResolverState, Counters,
+ request.Uid, request.Query, request.UserToken, FederatedQuerySetup, request.DbCounters, request.CompileServiceSpan.GetTraceId(),
+ std::move(request.TempTablesState));
auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap,
AppData(ctx)->UserPoolId);
@@ -828,19 +828,18 @@ private:
TKqpQueryCache QueryCache;
TKqpRequestsQueue RequestsQueue;
std::shared_ptr<IQueryReplayBackendFactory> QueryReplayFactory;
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
- NYql::IHTTPGateway::TPtr HttpGateway;
+ std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
};
IActor* CreateKqpCompileService(const TTableServiceConfig& serviceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- NYql::IHTTPGateway::TPtr httpGateway)
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup
+ )
{
return new TKqpCompileService(serviceConfig, metadataProviderConfig, kqpSettings, moduleResolverState, counters,
- std::move(queryReplayFactory), std::move(credentialsFactory), std::move(httpGateway));
+ std::move(queryReplayFactory), federatedQuerySetup);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h
index 32c28748e19..ff5ac1f218b 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.h
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.h
@@ -3,8 +3,7 @@
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/common/simple/temp_tables.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
-#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
-#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
+#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
namespace NKikimr {
namespace NKqp {
@@ -13,19 +12,19 @@ IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& servic
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- NYql::IHTTPGateway::TPtr httpGateway);
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup
+ );
IActor* CreateKqpCompileComputationPatternService(const NKikimrConfig::TTableServiceConfig& serviceConfig,
TIntrusivePtr<TKqpCounters> counters);
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
- const NKikimrConfig::TTableServiceConfig& serviceConfig,
- const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, NYql::IHTTPGateway::TPtr httpGateway,
+ const NKikimrConfig::TTableServiceConfig& serviceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& uid, const TKqpQueryId& query,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr);
diff --git a/ydb/core/kqp/compile_service/ya.make b/ydb/core/kqp/compile_service/ya.make
index 9eee94d92b5..53ba78b8be3 100644
--- a/ydb/core/kqp/compile_service/ya.make
+++ b/ydb/core/kqp/compile_service/ya.make
@@ -9,9 +9,9 @@ SRCS(
PEERDIR(
ydb/core/actorlib_impl
ydb/core/base
- ydb/core/kqp/host
ydb/core/kqp/common/simple
- ydb/library/yql/providers/common/http_gateway
+ ydb/core/kqp/federated_query
+ ydb/core/kqp/host
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt
index cbd0d6bc71d..c27956e5aa4 100644
--- a/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt
@@ -22,11 +22,12 @@ target_link_libraries(core-kqp-compute_actor PUBLIC
yutil
ydb-core-actorlib_impl
ydb-core-base
+ core-kqp-federated_query
core-kqp-runtime
core-tx-datashard
core-tx-scheme_cache
dq-actors-compute
- providers-common-http_gateway
+ providers-generic-actors
providers-s3-actors
yql-public-issue
tools-enum_parser-enum_serialization_runtime
diff --git a/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt
index 8b25a121abc..4eb2135274e 100644
--- a/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt
@@ -23,11 +23,12 @@ target_link_libraries(core-kqp-compute_actor PUBLIC
yutil
ydb-core-actorlib_impl
ydb-core-base
+ core-kqp-federated_query
core-kqp-runtime
core-tx-datashard
core-tx-scheme_cache
dq-actors-compute
- providers-common-http_gateway
+ providers-generic-actors
providers-s3-actors
yql-public-issue
tools-enum_parser-enum_serialization_runtime
diff --git a/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt
index 8b25a121abc..4eb2135274e 100644
--- a/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt
@@ -23,11 +23,12 @@ target_link_libraries(core-kqp-compute_actor PUBLIC
yutil
ydb-core-actorlib_impl
ydb-core-base
+ core-kqp-federated_query
core-kqp-runtime
core-tx-datashard
core-tx-scheme_cache
dq-actors-compute
- providers-common-http_gateway
+ providers-generic-actors
providers-s3-actors
yql-public-issue
tools-enum_parser-enum_serialization_runtime
diff --git a/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt
index cbd0d6bc71d..c27956e5aa4 100644
--- a/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt
@@ -22,11 +22,12 @@ target_link_libraries(core-kqp-compute_actor PUBLIC
yutil
ydb-core-actorlib_impl
ydb-core-base
+ core-kqp-federated_query
core-kqp-runtime
core-tx-datashard
core-tx-scheme_cache
dq-actors-compute
- providers-common-http_gateway
+ providers-generic-actors
providers-s3-actors
yql-public-issue
tools-enum_parser-enum_serialization_runtime
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
index 97e6c30fa09..d0232c0ea66 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
@@ -10,6 +10,8 @@
#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h>
+#include <ydb/library/yql/providers/generic/actors/yql_generic_source_factory.h>
+
namespace NKikimr {
namespace NMiniKQL {
@@ -55,13 +57,23 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
namespace NKqp {
-NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters, const NYql::IHTTPGateway::TPtr& httpGateway, const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) {
+NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
+ TIntrusivePtr<TKqpCounters> counters,
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup) {
auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
RegisterStreamLookupActorFactory(*factory, counters);
RegisterKqpReadActor(*factory, counters);
- RegisterS3ReadActorFactory(*factory, credentialsFactory, httpGateway);
- RegisterS3WriteActorFactory(*factory, credentialsFactory, httpGateway);
RegisterSequencerActorFactory(*factory, counters);
+
+ if (federatedQuerySetup) {
+ RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway);
+ RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway);
+
+ if (federatedQuerySetup->ConnectorClient) {
+ RegisterGenericReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->ConnectorClient);
+ }
+ }
+
return factory;
}
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index 1911a336e61..fd4c21150b6 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -1,11 +1,11 @@
#pragma once
-#include <ydb/core/kqp/counters/kqp_counters.h>
+
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
+#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
+#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
-#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
-#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
-#include <ydb/core/scheme/scheme_tabledefs.h>
namespace NKikimr {
@@ -59,7 +59,8 @@ IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vect
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
const ui64 txId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
-NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters, const NYql::IHTTPGateway::TPtr& httpGateway, const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);
+NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
+ TIntrusivePtr<TKqpCounters> counters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/compute_actor/ya.make b/ydb/core/kqp/compute_actor/ya.make
index a2f59717f2b..570d174d739 100644
--- a/ydb/core/kqp/compute_actor/ya.make
+++ b/ydb/core/kqp/compute_actor/ya.make
@@ -17,11 +17,12 @@ SRCS(
PEERDIR(
ydb/core/actorlib_impl
ydb/core/base
+ ydb/core/kqp/federated_query
ydb/core/kqp/runtime
ydb/core/tx/datashard
ydb/core/tx/scheme_cache
ydb/library/yql/dq/actors/compute
- ydb/library/yql/providers/common/http_gateway
+ ydb/library/yql/providers/generic/actors
ydb/library/yql/providers/s3/actors
ydb/library/yql/public/issue
)
diff --git a/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..551388ba4e2
--- /dev/null
+++ b/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-federated_query)
+target_compile_options(core-kqp-federated_query PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-federated_query PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-base
+ fq-libs-db_id_async_resolver_impl
+ fq-libs-grpc
+ library-db_pool-protos
+ providers-common-http_gateway
+ generic-connector-libcpp
+)
+target_sources(core-kqp-federated_query PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
+)
diff --git a/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..a662d35a7cc
--- /dev/null
+++ b/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,27 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-federated_query)
+target_compile_options(core-kqp-federated_query PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-federated_query PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-base
+ fq-libs-db_id_async_resolver_impl
+ fq-libs-grpc
+ library-db_pool-protos
+ providers-common-http_gateway
+ generic-connector-libcpp
+)
+target_sources(core-kqp-federated_query PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
+)
diff --git a/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..a662d35a7cc
--- /dev/null
+++ b/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,27 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-federated_query)
+target_compile_options(core-kqp-federated_query PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-federated_query PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-base
+ fq-libs-db_id_async_resolver_impl
+ fq-libs-grpc
+ library-db_pool-protos
+ providers-common-http_gateway
+ generic-connector-libcpp
+)
+target_sources(core-kqp-federated_query PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
+)
diff --git a/ydb/core/kqp/federated_query/CMakeLists.txt b/ydb/core/kqp/federated_query/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/kqp/federated_query/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..551388ba4e2
--- /dev/null
+++ b/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-federated_query)
+target_compile_options(core-kqp-federated_query PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-federated_query PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-base
+ fq-libs-db_id_async_resolver_impl
+ fq-libs-grpc
+ library-db_pool-protos
+ providers-common-http_gateway
+ generic-connector-libcpp
+)
+target_sources(core-kqp-federated_query PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
+)
diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
new file mode 100644
index 00000000000..345fc8db6cd
--- /dev/null
+++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
@@ -0,0 +1,122 @@
+#include "kqp_federated_query_helpers.h"
+
+#include <library/cpp/actors/http/http_proxy.h>
+
+#include <ydb/core/fq/libs/actors/database_resolver.h>
+#include <ydb/core/fq/libs/actors/proxy.h>
+#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
+#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
+
+namespace NKikimr::NKqp {
+
+ NYql::THttpGatewayConfig DefaultHttpGatewayConfig() {
+ NYql::THttpGatewayConfig config;
+ config.SetMaxInFlightCount(2000);
+ config.SetMaxSimulatenousDownloadsSize(2000000000);
+ config.SetBuffersSizePerStream(5000000);
+ config.SetConnectionTimeoutSeconds(15);
+ config.SetRequestTimeoutSeconds(0);
+ return config;
+ }
+
+ std::pair<TString, bool> ParseGrpcEndpoint(const TString& endpoint) {
+ TStringBuf scheme;
+ TStringBuf host;
+ TStringBuf uri;
+ NHttp::CrackURL(endpoint, scheme, host, uri);
+
+ return std::make_pair(ToString(host), scheme == "grpcs");
+ }
+
+ // TKqpFederatedQuerySetupFactoryDefault contains network clients and service actors necessary
+ // for federated queries. HTTP Gateway (required by S3 provider) is run by default even without
+ // explicit configuration. Token Accessor and Connector Client are run only if config is provided.
+ TKqpFederatedQuerySetupFactoryDefault::TKqpFederatedQuerySetupFactoryDefault(
+ NActors::TActorSystemSetup* setup,
+ const NKikimr::TAppData* appData,
+ const NKikimrConfig::TAppConfig& appConfig) {
+ const auto& queryServiceConfig = appConfig.GetQueryServiceConfig();
+
+ // Initialize HTTP Gateway
+ HttpGatewayConfig = queryServiceConfig.HasHttpGateway() ? queryServiceConfig.GetHttpGateway() : DefaultHttpGatewayConfig();
+ HttpGateway = NYql::IHTTPGateway::Make(&HttpGatewayConfig);
+
+ // Initialize Token Accessor
+ if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) {
+ const auto& tokenAccessorConfig = appConfig.GetAuthConfig().GetTokenAccessorConfig();
+ TString caContent;
+ if (const auto& path = tokenAccessorConfig.GetSslCaCert()) {
+ caContent = TUnbufferedFileInput(path).ReadAll();
+ }
+
+ auto parsed = ParseGrpcEndpoint(tokenAccessorConfig.GetEndpoint());
+ CredentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(
+ parsed.first,
+ parsed.second,
+ caContent,
+ tokenAccessorConfig.GetConnectionPoolSize());
+ }
+
+ // Initialize Connector client
+ if (queryServiceConfig.HasConnector()) {
+ ConnectorClient = NYql::NConnector::MakeClientGRPC(queryServiceConfig.GetConnector());
+
+ if (queryServiceConfig.HasMdbGateway()) {
+ MdbGateway = queryServiceConfig.GetMdbGateway();
+ }
+
+ if (queryServiceConfig.HasMdbTransformHost()) {
+ MdbEndpointGenerator = NFq::MakeMdbEndpointGeneratorGeneric(queryServiceConfig.GetMdbTransformHost());
+ }
+
+ // Create actors required for MDB database resolving
+ if (CredentialsFactory) {
+ auto httpProxyActor = NHttp::CreateHttpProxy();
+ auto httpProxyActorId = NFq::MakeYqlAnalyticsHttpProxyId();
+ setup->LocalServices.push_back(
+ std::make_pair(
+ httpProxyActorId,
+ TActorSetupCmd(httpProxyActor, TMailboxType::HTSwap, appData->UserPoolId)));
+
+ // FIXME: how to choose appropriate ActorID?
+ DatabaseResolverActorId = NFq::MakeDatabaseResolverActorId();
+ auto databaseResolverActor = NFq::CreateDatabaseResolver(httpProxyActorId, CredentialsFactory);
+ setup->LocalServices.push_back(
+ std::make_pair(DatabaseResolverActorId.value(),
+ TActorSetupCmd(databaseResolverActor, TMailboxType::HTSwap, appData->UserPoolId)));
+ }
+ }
+ }
+
+ std::optional<TKqpFederatedQuerySetup> TKqpFederatedQuerySetupFactoryDefault::Make(NActors::TActorSystem* actorSystem) {
+ auto result = TKqpFederatedQuerySetup{
+ HttpGateway,
+ ConnectorClient,
+ CredentialsFactory,
+ nullptr};
+
+ // Init DatabaseAsyncResolver only if all requirements are met
+ if (DatabaseResolverActorId && MdbGateway && MdbEndpointGenerator) {
+ result.DatabaseAsyncResovler = std::make_shared<NFq::TDatabaseAsyncResolverImpl>(
+ actorSystem,
+ DatabaseResolverActorId.value(),
+ "", // TODO: use YDB Gateway endpoint?
+ MdbGateway.value(),
+ MdbEndpointGenerator);
+ }
+
+ return result;
+ }
+
+ IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(
+ NActors::TActorSystemSetup* setup,
+ const NKikimr::TAppData* appData,
+ const NKikimrConfig::TAppConfig& appConfig) {
+ // If Query Service is disabled, just do nothing
+ if (!appData->FeatureFlags.GetEnableScriptExecutionOperations()) {
+ return std::make_shared<TKqpFederatedQuerySetupFactoryNoop>();
+ }
+
+ return std::make_shared<NKikimr::NKqp::TKqpFederatedQuerySetupFactoryDefault>(setup, appData, appConfig);
+ }
+}
diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h
new file mode 100644
index 00000000000..67b1eaecfa7
--- /dev/null
+++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h
@@ -0,0 +1,84 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_endpoint_generator.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
+#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
+
+namespace NKikimr::NKqp {
+
+ struct TKqpFederatedQuerySetup {
+ NYql::IHTTPGateway::TPtr HttpGateway;
+ NYql::NConnector::IClient::TPtr ConnectorClient;
+ NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
+ NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResovler;
+ };
+
+ struct IKqpFederatedQuerySetupFactory {
+ using TPtr = std::shared_ptr<IKqpFederatedQuerySetupFactory>;
+ virtual std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem* actorSystem) = 0;
+ virtual ~IKqpFederatedQuerySetupFactory() = default;
+ };
+
+ struct TKqpFederatedQuerySetupFactoryNoop: public IKqpFederatedQuerySetupFactory {
+ std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem*) override {
+ return std::nullopt;
+ }
+ };
+
+ struct TKqpFederatedQuerySetupFactoryDefault: public IKqpFederatedQuerySetupFactory {
+ TKqpFederatedQuerySetupFactoryDefault(){};
+
+ TKqpFederatedQuerySetupFactoryDefault(
+ NActors::TActorSystemSetup* setup,
+ const NKikimr::TAppData* appData,
+ const NKikimrConfig::TAppConfig& appConfig);
+
+ std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem* actorSystem) override;
+
+ private:
+ NYql::THttpGatewayConfig HttpGatewayConfig;
+ NYql::IHTTPGateway::TPtr HttpGateway;
+ NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
+ NYql::NConnector::IClient::TPtr ConnectorClient;
+ std::optional<NActors::TActorId> DatabaseResolverActorId;
+ NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
+ std::optional<TString> MdbGateway;
+ };
+
+ struct TKqpFederatedQuerySetupFactoryMock: public IKqpFederatedQuerySetupFactory {
+ TKqpFederatedQuerySetupFactoryMock() = delete;
+
+ TKqpFederatedQuerySetupFactoryMock(
+ NYql::IHTTPGateway::TPtr httpGateway,
+ NYql::NConnector::IClient::TPtr connectorClient,
+ NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResovler)
+ : HttpGateway(httpGateway)
+ , ConnectorClient(connectorClient)
+ , CredentialsFactory(credentialsFactory)
+ , DatabaseAsyncResovler(databaseAsyncResovler)
+ {
+ }
+
+ std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem*) override {
+ return TKqpFederatedQuerySetup{
+ HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResovler};
+ }
+
+ private:
+ NYql::IHTTPGateway::TPtr HttpGateway;
+ NYql::NConnector::IClient::TPtr ConnectorClient;
+ NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
+ NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResovler;
+ };
+
+ IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(
+ NActors::TActorSystemSetup* setup,
+ const NKikimr::TAppData* appData,
+ const NKikimrConfig::TAppConfig& config);
+}
diff --git a/ydb/core/kqp/federated_query/ya.make b/ydb/core/kqp/federated_query/ya.make
new file mode 100644
index 00000000000..4af74496ca6
--- /dev/null
+++ b/ydb/core/kqp/federated_query/ya.make
@@ -0,0 +1,18 @@
+LIBRARY()
+
+SRCS(
+ kqp_federated_query_helpers.cpp
+)
+
+PEERDIR(
+ ydb/core/base
+ ydb/core/fq/libs/db_id_async_resolver_impl
+ ydb/core/fq/libs/grpc
+ ydb/library/db_pool/protos
+ ydb/library/yql/providers/common/http_gateway
+ ydb/library/yql/providers/generic/connector/libcpp
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt
index 95d8c5b2728..1170ac5cd84 100644
--- a/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt
@@ -16,6 +16,7 @@ target_link_libraries(core-kqp-host PUBLIC
yutil
ydb-core-base
core-kqp-common
+ core-kqp-federated_query
core-kqp-opt
core-kqp-provider
tx-long_tx_service-public
@@ -27,6 +28,7 @@ target_link_libraries(core-kqp-host PUBLIC
providers-common-http_gateway
providers-common-udf_resolve
yql-providers-config
+ providers-generic-provider
providers-result-provider
providers-s3-provider
)
diff --git a/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt
index bec5bec644d..4955fb9ec50 100644
--- a/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt
@@ -17,6 +17,7 @@ target_link_libraries(core-kqp-host PUBLIC
yutil
ydb-core-base
core-kqp-common
+ core-kqp-federated_query
core-kqp-opt
core-kqp-provider
tx-long_tx_service-public
@@ -28,6 +29,7 @@ target_link_libraries(core-kqp-host PUBLIC
providers-common-http_gateway
providers-common-udf_resolve
yql-providers-config
+ providers-generic-provider
providers-result-provider
providers-s3-provider
)
diff --git a/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt
index bec5bec644d..4955fb9ec50 100644
--- a/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt
@@ -17,6 +17,7 @@ target_link_libraries(core-kqp-host PUBLIC
yutil
ydb-core-base
core-kqp-common
+ core-kqp-federated_query
core-kqp-opt
core-kqp-provider
tx-long_tx_service-public
@@ -28,6 +29,7 @@ target_link_libraries(core-kqp-host PUBLIC
providers-common-http_gateway
providers-common-udf_resolve
yql-providers-config
+ providers-generic-provider
providers-result-provider
providers-s3-provider
)
diff --git a/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt
index 95d8c5b2728..1170ac5cd84 100644
--- a/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt
@@ -16,6 +16,7 @@ target_link_libraries(core-kqp-host PUBLIC
yutil
ydb-core-base
core-kqp-common
+ core-kqp-federated_query
core-kqp-opt
core-kqp-provider
tx-long_tx_service-public
@@ -27,6 +28,7 @@ target_link_libraries(core-kqp-host PUBLIC
providers-common-http_gateway
providers-common-udf_resolve
yql-providers-config
+ providers-generic-provider
providers-result-provider
providers-s3-provider
)
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 93f08ca2237..2af39a3c389 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -18,6 +18,8 @@
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
+#include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h>
+#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h>
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
#include <ydb/library/yql/sql/sql.h>
@@ -891,17 +893,16 @@ class TKqpHost : public IKqpHost {
public:
TKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, const TString& database,
TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
- NYql::IHTTPGateway::TPtr httpGateway,
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
- bool isInternalCall, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, TKqpTempTablesState::TConstPtr tempTablesState = nullptr)
+ bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr)
: Gateway(gateway)
, Cluster(cluster)
, ExprCtx(new TExprContext())
, ModuleResolver(moduleResolver)
, KeepConfigChanges(keepConfigChanges)
, IsInternalCall(isInternalCall)
- , CredentialsFactory(std::move(credentialsFactory))
- , HttpGateway(std::move(httpGateway))
+ , FederatedQuerySetup(federatedQuerySetup)
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider))
, ClustersMap({{Cluster, TString(KikimrProviderName)}})
, TypesCtx(MakeIntrusive<TTypeAnnotationContext>())
@@ -1495,7 +1496,7 @@ private:
auto state = MakeIntrusive<NYql::TS3State>();
state->Types = TypesCtx.Get();
state->FunctionRegistry = FuncRegistry;
- state->CredentialsFactory = CredentialsFactory;
+ state->CredentialsFactory = FederatedQuerySetup->CredentialsFactory;
//
// TODO: Use TS3GatewayConfig from Kikimr Config when added
@@ -1509,16 +1510,34 @@ private:
}
state->Configuration->Init(cfg, TypesCtx);
- auto dataSource = NYql::CreateS3DataSource(state, HttpGateway);
- auto dataSink = NYql::CreateS3DataSink(state, HttpGateway);
+ auto dataSource = NYql::CreateS3DataSource(state, FederatedQuerySetup->HttpGateway);
+ auto dataSink = NYql::CreateS3DataSink(state, FederatedQuerySetup->HttpGateway);
TypesCtx->AddDataSource(NYql::S3ProviderName, std::move(dataSource));
TypesCtx->AddDataSink(NYql::S3ProviderName, std::move(dataSink));
}
+ void InitGenericProvider() {
+ if (!FederatedQuerySetup->ConnectorClient) {
+ return;
+ }
+
+ auto state = MakeIntrusive<NYql::TGenericState>(
+ TypesCtx.Get(),
+ FuncRegistry,
+ FederatedQuerySetup->DatabaseAsyncResovler,
+ FederatedQuerySetup->ConnectorClient,
+ nullptr
+ );
+
+ TypesCtx->AddDataSource(NYql::GenericProviderName, NYql::CreateGenericDataSource(state));
+ TypesCtx->AddDataSink(NYql::GenericProviderName, NYql::CreateGenericDataSink(state));
+ }
+
void Init(EKikimrQueryType queryType) {
- if (queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) {
+ if ((queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) && FederatedQuerySetup) {
InitS3Provider();
+ InitGenericProvider();
}
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry, TAppData::TimeProvider, TAppData::RandomProvider);
@@ -1642,8 +1661,7 @@ private:
IModuleResolver::TPtr ModuleResolver;
bool KeepConfigChanges;
bool IsInternalCall;
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
- NYql::IHTTPGateway::TPtr HttpGateway;
+ std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
THashMap<TString, TString> ClustersMap;
@@ -1683,11 +1701,12 @@ Ydb::Table::QueryStatsCollection::Mode GetStatsMode(NYql::EKikimrStatsMode stats
TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
- NYql::IHTTPGateway::TPtr httpGateway, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, TKqpTempTablesState::TConstPtr tempTablesState)
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
+ const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
+ TKqpTempTablesState::TConstPtr tempTablesState)
{
- return MakeIntrusive<TKqpHost>(gateway, cluster, database, config, moduleResolver, std::move(httpGateway), funcRegistry,
- keepConfigChanges, isInternalCall, std::move(credentialsFactory), std::move(tempTablesState));
+ return MakeIntrusive<TKqpHost>(gateway, cluster, database, config, moduleResolver, federatedQuerySetup, funcRegistry,
+ keepConfigChanges, isInternalCall, std::move(tempTablesState));
}
} // namespace NKqp
diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h
index 644a1dfc085..4eaebf1558b 100644
--- a/ydb/core/kqp/host/kqp_host.h
+++ b/ydb/core/kqp/host/kqp_host.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
@@ -100,8 +101,8 @@ public:
TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, const TString& database, NYql::TKikimrConfiguration::TPtr config, NYql::IModuleResolver::TPtr moduleResolver,
- NYql::IHTTPGateway::TPtr httpGateway, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, bool keepConfigChanges = false, bool isInternalCall = false,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, TKqpTempTablesState::TConstPtr tempTablesState = nullptr);
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
+ bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/host/ya.make b/ydb/core/kqp/host/ya.make
index 7f722836aee..0b7ff0368bd 100644
--- a/ydb/core/kqp/host/ya.make
+++ b/ydb/core/kqp/host/ya.make
@@ -12,6 +12,7 @@ SRCS(
PEERDIR(
ydb/core/base
ydb/core/kqp/common
+ ydb/core/kqp/federated_query
ydb/core/kqp/opt
ydb/core/kqp/provider
ydb/core/tx/long_tx_service/public
@@ -23,6 +24,7 @@ PEERDIR(
ydb/library/yql/providers/common/http_gateway
ydb/library/yql/providers/common/udf_resolve
ydb/library/yql/providers/config
+ ydb/library/yql/providers/generic/provider
ydb/library/yql/providers/result/provider
ydb/library/yql/providers/s3/provider
)
diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp
index e4380cf0621..a856f199a92 100644
--- a/ydb/core/kqp/node_service/kqp_node_ut.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp
@@ -184,8 +184,8 @@ public:
Runtime->EnableScheduleForActor(ResourceManagerActorId, true);
WaitForBootstrap();
- auto httpGateway = NYql::IHTTPGateway::Make();
- auto asyncIoFactory = CreateKqpAsyncIoFactory(KqpCounters, httpGateway, nullptr);
+ auto FederatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr});
+ auto asyncIoFactory = CreateKqpAsyncIoFactory(KqpCounters, FederatedQuerySetup);
auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), asyncIoFactory);
KqpNodeActorId = Runtime->Register(kqpNode);
Runtime->EnableScheduleForActor(KqpNodeActorId, true);
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt
index 74fe7fa6156..f8582de4c37 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt
@@ -34,6 +34,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
core-tx-schemeshard
ydb-library-query_actor
providers-common-http_gateway
+ providers-common-proto
yql-public-issue
api-protos
public-lib-operation_id
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
index 87e67e8adfe..8469df07477 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
@@ -35,6 +35,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
core-tx-schemeshard
ydb-library-query_actor
providers-common-http_gateway
+ providers-common-proto
yql-public-issue
api-protos
public-lib-operation_id
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt
index 87e67e8adfe..8469df07477 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt
@@ -35,6 +35,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
core-tx-schemeshard
ydb-library-query_actor
providers-common-http_gateway
+ providers-common-proto
yql-public-issue
api-protos
public-lib-operation_id
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt
index 74fe7fa6156..f8582de4c37 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt
@@ -34,6 +34,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
core-tx-schemeshard
ydb-library-query_actor
providers-common-http_gateway
+ providers-common-proto
yql-public-issue
api-protos
public-lib-operation_id
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index e3f6fd077db..c42ad4a89c9 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -105,16 +105,6 @@ TString EncodeSessionId(ui32 nodeId, const TString& id) {
return NOperationId::ProtoToString(opId);
}
-void ParseGrpcEndpoint(const TString& endpoint, TString& address, bool& useSsl) {
- TStringBuf scheme;
- TStringBuf host;
- TStringBuf uri;
- NHttp::CrackURL(endpoint, scheme, host, uri);
-
- address = ToString(host);
- useSsl = scheme == "grpcs";
-}
-
class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
struct TEvPrivate {
enum EEv {
@@ -169,43 +159,30 @@ public:
TKqpProxyService(const NKikimrConfig::TLogConfig& logConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
- const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TVector<NKikimrKqp::TKqpSetting>&& settings,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
- std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources)
- : HttpGatewayConfig(CreateHttpGatewayConfig())
- , LogConfig(logConfig)
+ std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources,
+ IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory
+ ): LogConfig(logConfig)
, TableServiceConfig(tableServiceConfig)
- , TokenAccessorConfig(tokenAccessorConfig)
, QueryServiceConfig(queryServiceConfig)
, MetadataProviderConfig(metadataProviderConfig)
, KqpSettings(std::make_shared<const TKqpSettings>(std::move(settings)))
+ , FederatedQuerySetupFactory(federatedQuerySetupFactory)
, QueryReplayFactory(std::move(queryReplayFactory))
- , HttpGateway(NYql::IHTTPGateway::Make(&HttpGatewayConfig)) // TODO: pass config and counters
, PendingRequests()
, ModuleResolverState()
, KqpProxySharedResources(std::move(kqpProxySharedResources))
{}
void Bootstrap(const TActorContext &ctx) {
- if (TokenAccessorConfig.GetEnabled()) {
- TString caContent;
- if (const auto& path = TokenAccessorConfig.GetSslCaCert()) {
- caContent = TUnbufferedFileInput(path).ReadAll();
- }
-
- TString endpointAddress;
- bool useSsl = false;
- ParseGrpcEndpoint(TokenAccessorConfig.GetEndpoint(), endpointAddress, useSsl);
-
- CredentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(endpointAddress, useSsl, caContent, TokenAccessorConfig.GetConnectionPoolSize());
- }
-
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(KQP_PROVIDER));
Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters, &TlsActivationContext->AsActorContext());
- AsyncIoFactory = CreateKqpAsyncIoFactory(Counters, HttpGateway, CredentialsFactory);
+ // NOTE: some important actors are constructed within next call
+ FederatedQuerySetup = FederatedQuerySetupFactory->Make(ctx.ActorSystem());
+ AsyncIoFactory = CreateKqpAsyncIoFactory(Counters, FederatedQuerySetup);
ModuleResolverState = MakeIntrusive<TModuleResolverState>();
LocalSessions = std::make_unique<TLocalSessionsRegistry>(AppData()->RandomProvider);
@@ -241,7 +218,7 @@ public:
// Create compile service
CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService(TableServiceConfig, MetadataProviderConfig,
- KqpSettings, ModuleResolverState, Counters, std::move(QueryReplayFactory), CredentialsFactory, HttpGateway));
+ KqpSettings, ModuleResolverState, Counters, std::move(QueryReplayFactory), FederatedQuerySetup));
TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService(
MakeKqpCompileServiceID(SelfId().NodeId()), CompileService);
@@ -1365,7 +1342,7 @@ private:
auto config = CreateConfig(KqpSettings, workerSettings);
- IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, HttpGateway, AsyncIoFactory, CredentialsFactory, ModuleResolverState, Counters, MetadataProviderConfig);
+ IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters, MetadataProviderConfig);
auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId);
TKqpSessionInfo* sessionInfo = LocalSessions->Create(
sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration());
@@ -1545,27 +1522,16 @@ private:
}
}
- static NYql::THttpGatewayConfig CreateHttpGatewayConfig() {
- NYql::THttpGatewayConfig config;
- config.SetMaxInFlightCount(2000);
- config.SetMaxSimulatenousDownloadsSize(2000000000);
- config.SetBuffersSizePerStream(5000000);
- config.SetConnectionTimeoutSeconds(15);
- config.SetRequestTimeoutSeconds(0);
- return config;
- }
-
private:
- NYql::THttpGatewayConfig HttpGatewayConfig;
NKikimrConfig::TLogConfig LogConfig;
NKikimrConfig::TTableServiceConfig TableServiceConfig;
- NKikimrProto::TTokenAccessorConfig TokenAccessorConfig;
NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;
TKqpSettings::TConstPtr KqpSettings;
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
+ IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory;
+ std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
std::shared_ptr<IQueryReplayBackendFactory> QueryReplayFactory;
- NYql::IHTTPGateway::TPtr HttpGateway;
+ NYql::NConnector::IClient::TPtr ConnectorClient;
std::optional<TPeerStats> PeerStats;
TKqpProxyRequestTracker PendingRequests;
@@ -1614,15 +1580,16 @@ private:
IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
- const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TVector<NKikimrKqp::TKqpSetting>&& settings,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
- std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources)
+ std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources,
+ IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory
+ )
{
- return new TKqpProxyService(logConfig, tableServiceConfig, tokenAccessorConfig, queryServiceConfig, metadataProviderConfig, std::move(settings),
- std::move(queryReplayFactory),std::move(kqpProxySharedResources));
+ return new TKqpProxyService(logConfig, tableServiceConfig, queryServiceConfig, metadataProviderConfig, std::move(settings),
+ std::move(queryReplayFactory), std::move(kqpProxySharedResources), std::move(federatedQuerySetupFactory));
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.h b/ydb/core/kqp/proxy_service/kqp_proxy_service.h
index 648a6010e7d..c070b4df35f 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.h
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.h
@@ -1,11 +1,13 @@
#pragma once
#include <ydb/core/base/appdata.h>
+#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
#include <library/cpp/actors/core/actorid.h>
#include <util/datetime/base.h>
+
namespace NKikimr::NKqp {
class IQueryReplayBackendFactory;
@@ -49,11 +51,12 @@ TPeerStats CalcPeerStats(const TVector<NKikimrKqp::TKqpProxyNodeResources>& data
IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
- const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TVector<NKikimrKqp::TKqpSetting>&& settings,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
- std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources);
+ std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources,
+ IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory
+ );
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/proxy_service/ya.make b/ydb/core/kqp/proxy_service/ya.make
index f7f95f81fa2..cef6cbd6640 100644
--- a/ydb/core/kqp/proxy_service/ya.make
+++ b/ydb/core/kqp/proxy_service/ya.make
@@ -26,6 +26,7 @@ PEERDIR(
ydb/core/tx/schemeshard
ydb/library/query_actor
ydb/library/yql/providers/common/http_gateway
+ ydb/library/yql/providers/common/proto
ydb/library/yql/public/issue
ydb/public/api/protos
ydb/public/lib/operation_id
diff --git a/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt
index ec5779b5b4a..7d60b1ef4fa 100644
--- a/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt
@@ -16,7 +16,7 @@ target_link_libraries(core-kqp-session_actor PUBLIC
yutil
ydb-core-docapi
core-kqp-common
- providers-common-http_gateway
+ core-kqp-federated_query
public-lib-operation_id
)
target_sources(core-kqp-session_actor PRIVATE
diff --git a/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt
index c6febd786ae..cbb38f3df3e 100644
--- a/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt
@@ -17,7 +17,7 @@ target_link_libraries(core-kqp-session_actor PUBLIC
yutil
ydb-core-docapi
core-kqp-common
- providers-common-http_gateway
+ core-kqp-federated_query
public-lib-operation_id
)
target_sources(core-kqp-session_actor PRIVATE
diff --git a/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt
index c6febd786ae..cbb38f3df3e 100644
--- a/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt
@@ -17,7 +17,7 @@ target_link_libraries(core-kqp-session_actor PUBLIC
yutil
ydb-core-docapi
core-kqp-common
- providers-common-http_gateway
+ core-kqp-federated_query
public-lib-operation_id
)
target_sources(core-kqp-session_actor PRIVATE
diff --git a/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt
index ec5779b5b4a..7d60b1ef4fa 100644
--- a/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt
@@ -16,7 +16,7 @@ target_link_libraries(core-kqp-session_actor PUBLIC
yutil
ydb-core-docapi
core-kqp-common
- providers-common-http_gateway
+ core-kqp-federated_query
public-lib-operation_id
)
target_sources(core-kqp-session_actor PRIVATE
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index a5060c0ad79..d0705e0132a 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -149,19 +149,18 @@ public:
}
TKqpSessionActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings,
- const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway,
+ const TKqpWorkerSettings& workerSettings,
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig)
: Owner(owner)
, SessionId(sessionId)
, Counters(counters)
, Settings(workerSettings)
- , HttpGateway(std::move(httpGateway))
, AsyncIoFactory(std::move(asyncIoFactory))
- , CredentialsFactory(std::move(credentialsFactory))
, ModuleResolverState(std::move(moduleResolverState))
+ , FederatedQuerySetup(federatedQuerySetup)
, KqpSettings(kqpSettings)
, Config(CreateConfig(kqpSettings, workerSettings))
, Transactions(*Config->_KqpMaxActiveTxPerSession.Get(), TDuration::Seconds(*Config->_KqpTxIdleTimeoutSec.Get()))
@@ -222,7 +221,7 @@ public:
void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) {
if (!WorkerId) {
std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(SelfId(), SessionId, KqpSettings, Settings,
- HttpGateway, ModuleResolverState, Counters, CredentialsFactory, MetadataProviderConfig));
+ FederatedQuerySetup, ModuleResolverState, Counters, MetadataProviderConfig));
WorkerId = RegisterWithSameMailbox(workerActor.release());
}
TlsActivationContext->Send(new IEventHandle(*WorkerId, SelfId(), QueryState->RequestEv.release(), ev->Flags, ev->Cookie,
@@ -2131,10 +2130,9 @@ private:
TIntrusivePtr<TKqpCounters> Counters;
TIntrusivePtr<TKqpRequestCounters> RequestCounters;
TKqpWorkerSettings Settings;
- NYql::IHTTPGateway::TPtr HttpGateway;
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
TIntrusivePtr<TModuleResolverState> ModuleResolverState;
+ std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
TKqpSettings::TConstPtr KqpSettings;
std::optional<TActorId> WorkerId;
TActorId ExecuterId;
@@ -2159,12 +2157,15 @@ private:
IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings,
- NYql::IHTTPGateway::TPtr httpGateway, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig)
{
- return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), std::move(asyncIoFactory), std::move(credentialsFactory), std::move(moduleResolverState), counters, metadataProviderConfig);
+ return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, federatedQuerySetup,
+ std::move(asyncIoFactory), std::move(moduleResolverState), counters,
+ metadataProviderConfig
+ );
}
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h
index fd6f9fd381b..548bce979a7 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.h
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.h
@@ -1,11 +1,10 @@
#pragma once
+#include <ydb/core/kqp/common/simple/temp_tables.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/protos/config.pb.h>
-#include <ydb/core/kqp/common/simple/temp_tables.h>
-#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
-#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
#include <library/cpp/actors/core/actorid.h>
@@ -35,10 +34,11 @@ struct TKqpWorkerSettings {
IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings,
- NYql::IHTTPGateway::TPtr httpGateway, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
+ NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
- const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig);
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig
+ );
IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target);
diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
index 6cf0b17d17a..c10a443d14a 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
@@ -95,17 +95,16 @@ public:
}
TKqpWorkerActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings,
- const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway,
+ const TKqpWorkerSettings& workerSettings, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig)
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig
+ )
: Owner(owner)
, SessionId(sessionId)
, Settings(workerSettings)
- , HttpGateway(std::move(httpGateway))
+ , FederatedQuerySetup(federatedQuerySetup)
, ModuleResolverState(moduleResolverState)
, Counters(counters)
- , CredentialsFactory(std::move(credentialsFactory))
, Config(MakeIntrusive<TKikimrConfiguration>())
, MetadataProviderConfig(metadataProviderConfig)
, CreationTime(TInstant::Now())
@@ -143,7 +142,7 @@ public:
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver,
- HttpGateway, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false, CredentialsFactory);
+ FederatedQuerySetup, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);
Become(&TKqpWorkerActor::ReadyState);
}
@@ -1060,10 +1059,9 @@ private:
TActorId Owner;
TString SessionId;
TKqpWorkerSettings Settings;
- NYql::IHTTPGateway::TPtr HttpGateway;
+ std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
TIntrusivePtr<TModuleResolverState> ModuleResolverState;
TIntrusivePtr<TKqpCounters> Counters;
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
TIntrusivePtr<TKqpRequestCounters> RequestCounters;
TKikimrConfiguration::TPtr Config;
NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;
@@ -1080,12 +1078,13 @@ private:
IActor* CreateKqpWorkerActor(const TActorId& owner, const TString& sessionId,
const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings,
- NYql::IHTTPGateway::TPtr httpGateway,
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig)
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig
+ )
{
- return new TKqpWorkerActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), moduleResolverState, counters, std::move(credentialsFactory), metadataProviderConfig);
+ return new TKqpWorkerActor(owner, sessionId, kqpSettings, workerSettings, federatedQuerySetup,
+ moduleResolverState, counters, metadataProviderConfig);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.h b/ydb/core/kqp/session_actor/kqp_worker_common.h
index c7254a4fa8a..d0d501ff1b9 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_common.h
+++ b/ydb/core/kqp/session_actor/kqp_worker_common.h
@@ -8,8 +8,6 @@
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
#include <ydb/core/kqp/provider/yql_kikimr_settings.h>
#include <ydb/core/protos/kqp.pb.h>
-#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
-#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/log.h>
@@ -147,9 +145,11 @@ bool HasSchemeOrFatalIssues(const NYql::TIssues& issues);
IActor* CreateKqpWorkerActor(const TActorId& owner, const TString& sessionId,
const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings,
- NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState,
- TIntrusivePtr<TKqpCounters> counters, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig);
+ std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
+ TIntrusivePtr<TModuleResolverState> moduleResolverState,
+ TIntrusivePtr<TKqpCounters> counters,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig
+ );
bool IsSameProtoType(const NKikimrMiniKQL::TType& actual, const NKikimrMiniKQL::TType& expected);
diff --git a/ydb/core/kqp/session_actor/ya.make b/ydb/core/kqp/session_actor/ya.make
index 04693ca4fdb..49d8ebfe83b 100644
--- a/ydb/core/kqp/session_actor/ya.make
+++ b/ydb/core/kqp/session_actor/ya.make
@@ -13,7 +13,7 @@ SRCS(
PEERDIR(
ydb/core/docapi
ydb/core/kqp/common
- ydb/library/yql/providers/common/http_gateway
+ ydb/core/kqp/federated_query
ydb/public/lib/operation_id
)
diff --git a/ydb/core/kqp/ut/common/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/ut/common/CMakeLists.darwin-x86_64.txt
index 9b01d3392d7..8ebd0100ee4 100644
--- a/ydb/core/kqp/ut/common/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/ut/common/CMakeLists.darwin-x86_64.txt
@@ -14,6 +14,7 @@ target_compile_options(kqp-ut-common PRIVATE
target_link_libraries(kqp-ut-common PUBLIC
contrib-libs-cxxsupp
yutil
+ core-kqp-federated_query
ydb-core-testlib
yql-public-udf
string_udf
diff --git a/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt
index d54f6db0bf1..091762509e9 100644
--- a/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt
@@ -15,6 +15,7 @@ target_link_libraries(kqp-ut-common PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ core-kqp-federated_query
ydb-core-testlib
yql-public-udf
string_udf
diff --git a/ydb/core/kqp/ut/common/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/ut/common/CMakeLists.linux-x86_64.txt
index d54f6db0bf1..091762509e9 100644
--- a/ydb/core/kqp/ut/common/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/ut/common/CMakeLists.linux-x86_64.txt
@@ -15,6 +15,7 @@ target_link_libraries(kqp-ut-common PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ core-kqp-federated_query
ydb-core-testlib
yql-public-udf
string_udf
diff --git a/ydb/core/kqp/ut/common/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/ut/common/CMakeLists.windows-x86_64.txt
index 9b01d3392d7..8ebd0100ee4 100644
--- a/ydb/core/kqp/ut/common/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/ut/common/CMakeLists.windows-x86_64.txt
@@ -14,6 +14,7 @@ target_compile_options(kqp-ut-common PRIVATE
target_link_libraries(kqp-ut-common PUBLIC
contrib-libs-cxxsupp
yutil
+ core-kqp-federated_query
ydb-core-testlib
yql-public-udf
string_udf
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index fa2fc041c6e..02d2543f15b 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -131,6 +131,10 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
if (settings.LogStream)
ServerSettings->SetLogBackend(new TStreamLogBackend(settings.LogStream));
+ if (settings.FederatedQuerySetupFactory) {
+ ServerSettings->SetFederatedQuerySetupFactory(settings.FederatedQuerySetupFactory);
+ }
+
Server.Reset(MakeHolder<Tests::TServer>(*ServerSettings));
Server->EnableGRpc(grpcPort);
Server->SetupDefaultProfiles();
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index 22e76d202b1..44bd0035f38 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -1,8 +1,9 @@
#pragma once
#include <ydb/core/testlib/test_client.h>
-
+#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/library/yql/core/issue/yql_issue.h>
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
@@ -10,9 +11,6 @@
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <library/cpp/yson/node/node_io.h>
-
-#include <ydb/library/yql/core/issue/yql_issue.h>
-
#include <library/cpp/json/json_reader.h>
#include <library/cpp/testing/unittest/tests_data.h>
#include <library/cpp/testing/unittest/registar.h>
@@ -83,6 +81,7 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
TDuration KeepSnapshotTimeout = TDuration::Zero();
IOutputStream* LogStream = nullptr;
TMaybe<NFake::TStorage> Storage = Nothing();
+ NKqp::IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory = std::make_shared<NKqp::TKqpFederatedQuerySetupFactoryNoop>();
TKikimrSettings()
{
@@ -106,6 +105,7 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
TKikimrSettings& SetKeepSnapshotTimeout(TDuration value) { KeepSnapshotTimeout = value; return *this; }
TKikimrSettings& SetLogStream(IOutputStream* follower) { LogStream = follower; return *this; };
TKikimrSettings& SetStorage(const NFake::TStorage& storage) { Storage = storage; return *this; };
+ TKikimrSettings& SetFederatedQuerySetupFactory(NKqp::IKqpFederatedQuerySetupFactory::TPtr value) { FederatedQuerySetupFactory = value; return *this; };
};
class TKikimrRunner {
diff --git a/ydb/core/kqp/ut/common/ya.make b/ydb/core/kqp/ut/common/ya.make
index 34c9747558b..30978c9412b 100644
--- a/ydb/core/kqp/ut/common/ya.make
+++ b/ydb/core/kqp/ut/common/ya.make
@@ -9,6 +9,7 @@ SRCS(
)
PEERDIR(
+ ydb/core/kqp/federated_query
ydb/core/testlib
ydb/library/yql/public/udf
ydb/library/yql/udfs/common/string
diff --git a/ydb/core/kqp/ut/federated_query/CMakeLists.txt b/ydb/core/kqp/ut/federated_query/CMakeLists.txt
index f8b31df0c11..f9b0afd798f 100644
--- a/ydb/core/kqp/ut/federated_query/CMakeLists.txt
+++ b/ydb/core/kqp/ut/federated_query/CMakeLists.txt
@@ -6,12 +6,5 @@
# original buildsystem will not be accepted.
-if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-aarch64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
- include(CMakeLists.darwin-x86_64.txt)
-elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
- include(CMakeLists.windows-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-x86_64.txt)
-endif()
+add_subdirectory(common)
+add_subdirectory(s3)
diff --git a/ydb/core/kqp/ut/federated_query/common/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/ut/federated_query/common/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..bbe698f755e
--- /dev/null
+++ b/ydb/core/kqp/ut/federated_query/common/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ut-federated_query-common)
+target_compile_options(ut-federated_query-common PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ut-federated_query-common PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ kqp-ut-common
+ cpp-client-ydb_operation
+ cpp-client-ydb_query
+)
+target_sources(ut-federated_query-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/common/common.cpp
+)
diff --git a/ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..c228a1c938b
--- /dev/null
+++ b/ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ut-federated_query-common)
+target_compile_options(ut-federated_query-common PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ut-federated_query-common PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ kqp-ut-common
+ cpp-client-ydb_operation
+ cpp-client-ydb_query
+)
+target_sources(ut-federated_query-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/common/common.cpp
+)
diff --git a/ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..c228a1c938b
--- /dev/null
+++ b/ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ut-federated_query-common)
+target_compile_options(ut-federated_query-common PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ut-federated_query-common PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ kqp-ut-common
+ cpp-client-ydb_operation
+ cpp-client-ydb_query
+)
+target_sources(ut-federated_query-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/common/common.cpp
+)
diff --git a/ydb/core/kqp/ut/federated_query/common/CMakeLists.txt b/ydb/core/kqp/ut/federated_query/common/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/kqp/ut/federated_query/common/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/kqp/ut/federated_query/common/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/ut/federated_query/common/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..bbe698f755e
--- /dev/null
+++ b/ydb/core/kqp/ut/federated_query/common/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ut-federated_query-common)
+target_compile_options(ut-federated_query-common PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ut-federated_query-common PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ kqp-ut-common
+ cpp-client-ydb_operation
+ cpp-client-ydb_query
+)
+target_sources(ut-federated_query-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/common/common.cpp
+)
diff --git a/ydb/core/kqp/ut/federated_query/common/common.cpp b/ydb/core/kqp/ut/federated_query/common/common.cpp
new file mode 100644
index 00000000000..1c8faf78925
--- /dev/null
+++ b/ydb/core/kqp/ut/federated_query/common/common.cpp
@@ -0,0 +1,44 @@
+#include "common.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NKqp::NFederatedQueryTest {
+
+ NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) {
+ NYdb::NOperation::TOperationClient client(ydbDriver);
+ NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op;
+ do {
+ if (op.Initialized()) {
+ Sleep(TDuration::MilliSeconds(10));
+ }
+ op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId);
+ UNIT_ASSERT_C(op.GetValueSync().Status().IsSuccess(), TStringBuilder() << op.GetValueSync().Status().GetStatus() << ":" << op.GetValueSync().Status().GetIssues().ToString());
+ } while (!op.GetValueSync().Ready());
+ return op.GetValueSync();
+ }
+
+ std::shared_ptr<TKikimrRunner> MakeKikimrRunner(
+ NYql::IHTTPGateway::TPtr httpGateway,
+ NYql::NConnector::IClient::TPtr connectorClient,
+ std::optional<NKikimrConfig::TAppConfig> appConfig)
+ {
+ NKikimrConfig::TFeatureFlags featureFlags;
+ featureFlags.SetEnableExternalDataSources(true);
+ featureFlags.SetEnableScriptExecutionOperations(true);
+
+ auto federatedQuerySetupFactory = std::make_shared<TKqpFederatedQuerySetupFactoryMock>(
+ httpGateway, connectorClient, nullptr, nullptr
+ );
+
+ auto settings = TKikimrSettings()
+ .SetFeatureFlags(featureFlags)
+ .SetFederatedQuerySetupFactory(federatedQuerySetupFactory)
+ .SetKqpSettings({})
+ .SetEnableScriptExecutionOperations(true);
+
+ settings = appConfig ? settings.SetAppConfig(appConfig.value()) : settings.SetAppConfig({});
+
+ return std::make_shared<TKikimrRunner>(settings);
+ }
+
+}
diff --git a/ydb/core/kqp/ut/federated_query/common/common.h b/ydb/core/kqp/ut/federated_query/common/common.h
new file mode 100644
index 00000000000..e55b2961d2a
--- /dev/null
+++ b/ydb/core/kqp/ut/federated_query/common/common.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/public/sdk/cpp/client/ydb_query/query.h>
+#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
+
+namespace NKikimr::NKqp::NFederatedQueryTest {
+ using namespace NKikimr::NKqp;
+
+ NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(
+ const NYdb::TOperation::TOperationId& operationId,
+ const NYdb::TDriver& ydbDriver);
+
+ std::shared_ptr<TKikimrRunner> MakeKikimrRunner(
+ NYql::IHTTPGateway::TPtr httpGateway = nullptr,
+ NYql::NConnector::IClient::TPtr connectorClient = nullptr,
+ std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt
+ );
+}
diff --git a/ydb/core/kqp/ut/federated_query/common/ya.make b/ydb/core/kqp/ut/federated_query/common/ya.make
new file mode 100644
index 00000000000..de8a77b04b8
--- /dev/null
+++ b/ydb/core/kqp/ut/federated_query/common/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+SRCS(
+ common.cpp
+)
+
+PEERDIR(
+ ydb/core/kqp/ut/common
+ ydb/public/sdk/cpp/client/ydb_operation
+ ydb/public/sdk/cpp/client/ydb_query
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/kqp/ut/federated_query/run_testpack b/ydb/core/kqp/ut/federated_query/run_testpack
deleted file mode 100755
index 5be30f1a7d7..00000000000
--- a/ydb/core/kqp/ut/federated_query/run_testpack
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/usr/bin/env bash
-set -e
-CTX_DIR=$(mktemp -d)
-echo Working dir: $CTX_DIR
-cd "$CTX_DIR"
-
-# Start server
-MOTO_SERVER_EXECUTABLE=moto_server $source_root/ydb/tests/tools/s3_recipe/start.sh $CTX_DIR
-
-# Run test
-set +e
-S3_ENDPOINT=$(cat $CTX_DIR/env.json | grep 'S3_ENDPOINT' | cut -f 4 -d '"') \
-$build_root/ydb/core/kqp/ut/federated_query/ydb-core-kqp-ut-federated_query $*
-
-code=$?
-if [ $code -gt 0 ];then
- echo
- echo "Test execution failed"
- echo
-fi
-
-# Stop server (removes working dir!)
-$source_root/ydb/tests/tools/s3_recipe/stop.sh $CTX_DIR
-
-# Return result code from test run
-if [ $code -gt 0 ];then
- exit $code
-fi
diff --git a/ydb/core/kqp/ut/federated_query/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.darwin-x86_64.txt
index 20bf70d4a8e..7498df42dfa 100644
--- a/ydb/core/kqp/ut/federated_query/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.darwin-x86_64.txt
@@ -7,14 +7,14 @@
-add_executable(ydb-core-kqp-ut-federated_query)
-target_compile_options(ydb-core-kqp-ut-federated_query PRIVATE
+add_executable(ydb-core-kqp-ut-federated_query-s3)
+target_compile_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE
-DUSE_CURRENT_UDF_ABI_VERSION
)
-target_include_directories(ydb-core-kqp-ut-federated_query PRIVATE
+target_include_directories(ydb-core-kqp-ut-federated_query-s3 PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp
)
-target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC
+target_link_libraries(ydb-core-kqp-ut-federated_query-s3 PUBLIC
contrib-libs-cxxsupp
yutil
library-cpp-cpuid_check
@@ -22,32 +22,32 @@ target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC
ydb-core-kqp
libs-aws-sdk-cpp-aws-cpp-sdk-s3
kqp-ut-common
+ ut-federated_query-common
yql-sql-pg_dummy
- cpp-client-ydb_operation
client-ydb_types-operation
)
-target_link_options(ydb-core-kqp-ut-federated_query PRIVATE
+target_link_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE
-Wl,-platform_version,macos,11.0,11.0
-fPIC
-fPIC
-framework
CoreFoundation
)
-target_sources(ydb-core-kqp-ut-federated_query PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+target_sources(ydb-core-kqp-ut-federated_query-s3 PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
)
set_property(
TARGET
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
SPLIT_FACTOR
1
)
add_yunittest(
NAME
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
TEST_TARGET
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
TEST_ARG
--print-before-suite
--print-before-test
@@ -57,26 +57,26 @@ add_yunittest(
)
set_yunittest_property(
TEST
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
LABELS
MEDIUM
)
set_yunittest_property(
TEST
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
PROCESSORS
1
)
set_yunittest_property(
TEST
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
TIMEOUT
600
)
-target_allocator(ydb-core-kqp-ut-federated_query
+target_allocator(ydb-core-kqp-ut-federated_query-s3
system_allocator
)
-vcs_info(ydb-core-kqp-ut-federated_query)
+vcs_info(ydb-core-kqp-ut-federated_query-s3)
diff --git a/ydb/core/kqp/ut/federated_query/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.linux-aarch64.txt
index 3f8fc55a471..bd342c62e11 100644
--- a/ydb/core/kqp/ut/federated_query/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.linux-aarch64.txt
@@ -7,14 +7,14 @@
-add_executable(ydb-core-kqp-ut-federated_query)
-target_compile_options(ydb-core-kqp-ut-federated_query PRIVATE
+add_executable(ydb-core-kqp-ut-federated_query-s3)
+target_compile_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE
-DUSE_CURRENT_UDF_ABI_VERSION
)
-target_include_directories(ydb-core-kqp-ut-federated_query PRIVATE
+target_include_directories(ydb-core-kqp-ut-federated_query-s3 PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp
)
-target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC
+target_link_libraries(ydb-core-kqp-ut-federated_query-s3 PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
@@ -22,11 +22,11 @@ target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC
ydb-core-kqp
libs-aws-sdk-cpp-aws-cpp-sdk-s3
kqp-ut-common
+ ut-federated_query-common
yql-sql-pg_dummy
- cpp-client-ydb_operation
client-ydb_types-operation
)
-target_link_options(ydb-core-kqp-ut-federated_query PRIVATE
+target_link_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE
-ldl
-lrt
-Wl,--no-as-needed
@@ -36,21 +36,21 @@ target_link_options(ydb-core-kqp-ut-federated_query PRIVATE
-lrt
-ldl
)
-target_sources(ydb-core-kqp-ut-federated_query PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+target_sources(ydb-core-kqp-ut-federated_query-s3 PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
)
set_property(
TARGET
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
SPLIT_FACTOR
1
)
add_yunittest(
NAME
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
TEST_TARGET
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
TEST_ARG
--print-before-suite
--print-before-test
@@ -60,26 +60,26 @@ add_yunittest(
)
set_yunittest_property(
TEST
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
LABELS
MEDIUM
)
set_yunittest_property(
TEST
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
PROCESSORS
1
)
set_yunittest_property(
TEST
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
TIMEOUT
600
)
-target_allocator(ydb-core-kqp-ut-federated_query
+target_allocator(ydb-core-kqp-ut-federated_query-s3
cpp-malloc-jemalloc
)
-vcs_info(ydb-core-kqp-ut-federated_query)
+vcs_info(ydb-core-kqp-ut-federated_query-s3)
diff --git a/ydb/core/kqp/ut/federated_query/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.linux-x86_64.txt
index d494f203529..5d3f80e8e31 100644
--- a/ydb/core/kqp/ut/federated_query/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.linux-x86_64.txt
@@ -7,14 +7,14 @@
-add_executable(ydb-core-kqp-ut-federated_query)
-target_compile_options(ydb-core-kqp-ut-federated_query PRIVATE
+add_executable(ydb-core-kqp-ut-federated_query-s3)
+target_compile_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE
-DUSE_CURRENT_UDF_ABI_VERSION
)
-target_include_directories(ydb-core-kqp-ut-federated_query PRIVATE
+target_include_directories(ydb-core-kqp-ut-federated_query-s3 PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp
)
-target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC
+target_link_libraries(ydb-core-kqp-ut-federated_query-s3 PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
@@ -23,11 +23,11 @@ target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC
ydb-core-kqp
libs-aws-sdk-cpp-aws-cpp-sdk-s3
kqp-ut-common
+ ut-federated_query-common
yql-sql-pg_dummy
- cpp-client-ydb_operation
client-ydb_types-operation
)
-target_link_options(ydb-core-kqp-ut-federated_query PRIVATE
+target_link_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE
-ldl
-lrt
-Wl,--no-as-needed
@@ -37,21 +37,21 @@ target_link_options(ydb-core-kqp-ut-federated_query PRIVATE
-lrt
-ldl
)
-target_sources(ydb-core-kqp-ut-federated_query PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+target_sources(ydb-core-kqp-ut-federated_query-s3 PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
)
set_property(
TARGET
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
SPLIT_FACTOR
1
)
add_yunittest(
NAME
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
TEST_TARGET
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
TEST_ARG
--print-before-suite
--print-before-test
@@ -61,27 +61,27 @@ add_yunittest(
)
set_yunittest_property(
TEST
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
LABELS
MEDIUM
)
set_yunittest_property(
TEST
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
PROCESSORS
1
)
set_yunittest_property(
TEST
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
TIMEOUT
600
)
-target_allocator(ydb-core-kqp-ut-federated_query
+target_allocator(ydb-core-kqp-ut-federated_query-s3
cpp-malloc-tcmalloc
libs-tcmalloc-no_percpu_cache
)
-vcs_info(ydb-core-kqp-ut-federated_query)
+vcs_info(ydb-core-kqp-ut-federated_query-s3)
diff --git a/ydb/core/kqp/ut/federated_query/s3/CMakeLists.txt b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/kqp/ut/federated_query/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.windows-x86_64.txt
index 04e259e6c76..e8ca79a81a1 100644
--- a/ydb/core/kqp/ut/federated_query/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.windows-x86_64.txt
@@ -7,14 +7,14 @@
-add_executable(ydb-core-kqp-ut-federated_query)
-target_compile_options(ydb-core-kqp-ut-federated_query PRIVATE
+add_executable(ydb-core-kqp-ut-federated_query-s3)
+target_compile_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE
-DUSE_CURRENT_UDF_ABI_VERSION
)
-target_include_directories(ydb-core-kqp-ut-federated_query PRIVATE
+target_include_directories(ydb-core-kqp-ut-federated_query-s3 PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp
)
-target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC
+target_link_libraries(ydb-core-kqp-ut-federated_query-s3 PUBLIC
contrib-libs-cxxsupp
yutil
library-cpp-cpuid_check
@@ -22,25 +22,25 @@ target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC
ydb-core-kqp
libs-aws-sdk-cpp-aws-cpp-sdk-s3
kqp-ut-common
+ ut-federated_query-common
yql-sql-pg_dummy
- cpp-client-ydb_operation
client-ydb_types-operation
)
-target_sources(ydb-core-kqp-ut-federated_query PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+target_sources(ydb-core-kqp-ut-federated_query-s3 PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
)
set_property(
TARGET
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
SPLIT_FACTOR
1
)
add_yunittest(
NAME
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
TEST_TARGET
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
TEST_ARG
--print-before-suite
--print-before-test
@@ -50,26 +50,26 @@ add_yunittest(
)
set_yunittest_property(
TEST
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
LABELS
MEDIUM
)
set_yunittest_property(
TEST
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
PROCESSORS
1
)
set_yunittest_property(
TEST
- ydb-core-kqp-ut-federated_query
+ ydb-core-kqp-ut-federated_query-s3
PROPERTY
TIMEOUT
600
)
-target_allocator(ydb-core-kqp-ut-federated_query
+target_allocator(ydb-core-kqp-ut-federated_query-s3
system_allocator
)
-vcs_info(ydb-core-kqp-ut-federated_query)
+vcs_info(ydb-core-kqp-ut-federated_query-s3)
diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
index 93e644cba81..aff9a1a45f7 100644
--- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
@@ -1,5 +1,3 @@
-#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
-
#include <util/system/env.h>
#include <aws/core/auth/AWSCredentialsProvider.h>
@@ -10,20 +8,24 @@
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/S3Client.h>
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/core/kqp/ut/federated_query/common/common.h>
+#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <fmt/format.h>
-#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
namespace NKikimr {
namespace NKqp {
using namespace NYdb;
using namespace NYdb::NQuery;
+using namespace NKikimr::NKqp::NFederatedQueryTest;
constexpr TStringBuf TEST_CONTENT =
R"({"key": "1", "value": "trololo"}
@@ -164,18 +166,6 @@ TString GetBucketLocation(const TStringBuf bucket) {
return TStringBuilder() << GetEnv("S3_ENDPOINT") << '/' << bucket << '/';
}
-NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) {
- NYdb::NOperation::TOperationClient client(ydbDriver);
- NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op;
- do {
- if (op.Initialized()) {
- Sleep(TDuration::MilliSeconds(10));
- }
- op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId);
- UNIT_ASSERT_C(op.GetValueSync().Status().IsSuccess(), op.GetValueSync().Status().GetStatus() << ":" << op.GetValueSync().Status().GetIssues().ToString());
- } while (!op.GetValueSync().Ready());
- return op.GetValueSync();
-}
Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
Y_UNIT_TEST(ExecuteScriptWithExternalTableResolve) {
@@ -187,10 +177,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
CreateBucketWithObject(bucket, object, TEST_CONTENT);
- auto kikimr = DefaultKikimrRunner();
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
- auto tc = kikimr.GetTableClient();
+ auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
@@ -219,12 +208,12 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
SELECT * FROM `{external_table}`
)", "external_table"_a=externalTableName);
- auto db = kikimr.GetQueryClient();
+ auto db = kikimr->GetQueryClient();
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
- NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
@@ -251,10 +240,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
CreateBucketWithObject(bucket, object, TEST_CONTENT);
- auto kikimr = DefaultKikimrRunner();
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
- auto tc = kikimr.GetTableClient();
+ auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
@@ -282,7 +270,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
SELECT * FROM `{external_table}`
)", "external_table"_a=externalTableName);
- auto db = kikimr.GetQueryClient();
+ auto db = kikimr->GetQueryClient();
auto executeQueryIterator = db.StreamExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
size_t currentRow = 0;
@@ -323,10 +311,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
CreateBucketWithObject(bucket, object, TEST_CONTENT);
- auto kikimr = DefaultKikimrRunner();
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
- auto tc = kikimr.GetTableClient();
+ auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
@@ -356,17 +343,17 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
SELECT * FROM `{external_table}`
)", "external_table"_a=externalTableName);
- auto db = kikimr.GetQueryClient();
+ auto db = kikimr->GetQueryClient();
auto scriptExecutionOperation = db.ExecuteScript(sql, settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
- NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false);
scriptExecutionOperation = db.ExecuteScript(sql, settings).ExtractValueSync();
- readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false);
}
@@ -378,9 +365,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
CreateBucketWithObject(bucket, "test_object", TEST_CONTENT);
- auto kikimr = DefaultKikimrRunner();
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
- auto tc = kikimr.GetTableClient();
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
+ auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
@@ -393,7 +379,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
- auto db = kikimr.GetQueryClient();
+ auto db = kikimr->GetQueryClient();
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
SELECT * FROM `{external_source}`.`/` WITH (
format="json_each_row",
@@ -406,7 +392,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
- NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
@@ -430,9 +416,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
CreateBucketWithObject(bucket, "test_object", TEST_CONTENT);
- auto kikimr = DefaultKikimrRunner();
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
- auto tc = kikimr.GetTableClient();
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
+ auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
{
const TString query = fmt::format(R"(
@@ -465,7 +450,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
- auto db = kikimr.GetQueryClient();
+ auto db = kikimr->GetQueryClient();
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
SELECT t1.key as key, t1.value as v1, t2.value as v2 FROM `{external_source}`.`/` WITH (
format="json_each_row",
@@ -480,7 +465,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
- NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
@@ -507,10 +492,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
CreateBucketWithObject(bucket, object, TEST_CONTENT);
- auto kikimr = DefaultKikimrRunner();
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
- auto tc = kikimr.GetTableClient();
+ auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
@@ -534,18 +518,18 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
- auto db = kikimr.GetQueryClient();
+ auto db = kikimr->GetQueryClient();
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
PRAGMA s3.JsonListSizeLimit = "10";
PRAGMA s3.SourceCoroActor = 'true';
- PRAGMA Kikimr.OptEnableOlapPushdown = "false";
+ PRAGMA kikimr.OptEnableOlapPushdown = "false";
SELECT * FROM `{external_table}`
)", "external_table"_a=externalTableName)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
- NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
- UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
+ UNIT_ASSERT_C(readyOp.Metadata().ExecStatus == EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
@@ -570,9 +554,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
CreateBucketWithObject(bucket, "test_object", TEST_CONTENT);
- auto kikimr = DefaultKikimrRunner();
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
- auto tc = kikimr.GetTableClient();
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
+ auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
{
const TString query = fmt::format(R"(
@@ -605,11 +588,11 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
- auto db = kikimr.GetQueryClient();
+ auto db = kikimr->GetQueryClient();
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
PRAGMA s3.JsonListSizeLimit = "10";
PRAGMA s3.SourceCoroActor = 'true';
- PRAGMA Kikimr.OptEnableOlapPushdown = "false";
+ PRAGMA kikimr.OptEnableOlapPushdown = "false";
SELECT t1.key as key, t1.value as v1, t2.value as v2 FROM `{external_source}`.`/` WITH (
format="json_each_row",
schema(
@@ -623,8 +606,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
- NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
- UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
+ UNIT_ASSERT_C(readyOp.Metadata().ExecStatus == EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
@@ -648,9 +631,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
CreateBucketWithObject(bucket, "test_object", TEST_CONTENT);
- auto kikimr = DefaultKikimrRunner();
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
- auto tc = kikimr.GetTableClient();
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
+ auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
@@ -663,7 +645,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
- auto db = kikimr.GetQueryClient();
+ auto db = kikimr->GetQueryClient();
auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"(
SELECT * FROM `{external_source}`.`*` WITH (
format="json_each_row",
@@ -676,7 +658,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
- NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
@@ -701,20 +683,12 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
CreateBucketWithObject(bucket, object, TEST_CONTENT);
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetBindingsMode(mode);
-
- NKikimrConfig::TFeatureFlags featureFlags;
- featureFlags.SetEnableExternalDataSources(true);
- featureFlags.SetEnableScriptExecutionOperations(true);
-
- auto settings = TKikimrSettings()
- .SetAppConfig(appConfig)
- .SetFeatureFlags(featureFlags);
+ auto appConfig = std::make_optional<NKikimrConfig::TAppConfig>();
+ appConfig->MutableTableServiceConfig()->SetBindingsMode(mode);
- TKikimrRunner kikimr(settings);
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make(), nullptr, appConfig);
- auto tc = kikimr.GetTableClient();
+ auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
@@ -742,12 +716,12 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
SELECT * FROM bindings.`{external_table}`
)", "external_table"_a=externalTableName);
- auto db = kikimr.GetQueryClient();
+ auto db = kikimr->GetQueryClient();
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
- NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
TFetchScriptResultsResult results(TStatus(EStatus::SUCCESS, {}));
if (readyOp.Metadata().ExecStatus == EExecStatus::Completed) {
results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
@@ -824,10 +798,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
CreateBucket(writeBucket, s3Client);
}
- auto kikimr = DefaultKikimrRunner();
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
- auto tc = kikimr.GetTableClient();
+ auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{read_source}` WITH (
@@ -877,7 +850,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
"read_table"_a=readTableName,
"write_table"_a = writeTableName);
- auto db = kikimr.GetQueryClient();
+ auto db = kikimr->GetQueryClient();
auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx());
resultFuture.Wait();
UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString());
@@ -917,10 +890,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
CreateBucket(writeBucket, s3Client);
}
- auto kikimr = DefaultKikimrRunner();
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
- auto tc = kikimr.GetTableClient();
+ auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
{
const TString query = fmt::format(R"(
@@ -963,7 +935,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
- auto db = kikimr.GetQueryClient();
+ auto db = kikimr->GetQueryClient();
ExecuteInsertQuery(db, writeTableName, readTableName, false);
ExecuteInsertQuery(db, writeTableName, readTableName, true);
{
@@ -1005,10 +977,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UploadObject(bucket, keysObject, TEST_CONTENT_KEYS, s3Client);
}
- auto kikimr = DefaultKikimrRunner();
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
- auto tc = kikimr.GetTableClient();
+ auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{data_source}` WITH (
@@ -1053,7 +1024,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
"data_table"_a = dataTable,
"keys_table"_a = keysTable);
- auto db = kikimr.GetQueryClient();
+ auto db = kikimr->GetQueryClient();
auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx());
resultFuture.Wait();
UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString());
@@ -1075,10 +1046,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
CreateBucketWithObject(bucket, object, content);
- auto kikimr = DefaultKikimrRunner();
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
- auto tc = kikimr.GetTableClient();
+ auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
@@ -1104,7 +1074,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
- auto db = kikimr.GetQueryClient();
+ auto db = kikimr->GetQueryClient();
const TString sql = fmt::format(R"(
SELECT * FROM `{external_table}`
)", "external_table"_a = externalTableName);
@@ -1112,7 +1082,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
- NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
diff --git a/ydb/core/kqp/ut/federated_query/s3/ya.make b/ydb/core/kqp/ut/federated_query/s3/ya.make
new file mode 100644
index 00000000000..e043ec191fb
--- /dev/null
+++ b/ydb/core/kqp/ut/federated_query/s3/ya.make
@@ -0,0 +1,28 @@
+UNITTEST_FOR(ydb/core/kqp)
+
+IF (WITH_VALGRIND)
+ TIMEOUT(3600)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ELSE()
+ TIMEOUT(600)
+ SIZE(MEDIUM)
+ENDIF()
+
+SRCS(
+ kqp_federated_query_ut.cpp
+)
+
+PEERDIR(
+ contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3
+ ydb/core/kqp/ut/common
+ ydb/core/kqp/ut/federated_query/common
+ ydb/library/yql/sql/pg_dummy
+ ydb/public/sdk/cpp/client/ydb_types/operation
+)
+
+YQL_LAST_ABI_VERSION()
+
+INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/s3_recipe/recipe.inc)
+
+END()
diff --git a/ydb/core/kqp/ut/federated_query/ya.make b/ydb/core/kqp/ut/federated_query/ya.make
index dd07aecaf6b..abe2eb1010f 100644
--- a/ydb/core/kqp/ut/federated_query/ya.make
+++ b/ydb/core/kqp/ut/federated_query/ya.make
@@ -1,28 +1,7 @@
-UNITTEST_FOR(ydb/core/kqp)
-
-IF (WITH_VALGRIND)
- TIMEOUT(3600)
- SIZE(LARGE)
- TAG(ya:fat)
-ELSE()
- TIMEOUT(600)
- SIZE(MEDIUM)
-ENDIF()
-
-SRCS(
- kqp_federated_query_ut.cpp
+RECURSE_FOR_TESTS(
+ s3
)
-PEERDIR(
- contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3
- ydb/core/kqp/ut/common
- ydb/library/yql/sql/pg_dummy
- ydb/public/sdk/cpp/client/ydb_operation
- ydb/public/sdk/cpp/client/ydb_types/operation
+RECURSE(
+ common
)
-
-YQL_LAST_ABI_VERSION()
-
-INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/s3_recipe/recipe.inc)
-
-END()
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
index 16d0ca327b0..6ebd3e68b46 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
@@ -52,8 +52,9 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga
UNIT_ASSERT(TryParseFromTextFormat(defaultSettingsStream, defaultSettings));
kikimrConfig->Init(defaultSettings.GetDefaultSettings(), cluster, settings, true);
- return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver, NYql::IHTTPGateway::Make(), funcRegistry,
- keepConfigChanges);
+ auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr});
+ return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver,
+ federatedQuerySetup, funcRegistry, funcRegistry, keepConfigChanges);
}
NYql::NNodes::TExprBase GetExpr(const TString& ast, NYql::TExprContext& ctx, NYql::IModuleResolver* moduleResolver) {
diff --git a/ydb/core/kqp/ya.make b/ydb/core/kqp/ya.make
index e34ec78a290..0c70ffb04c7 100644
--- a/ydb/core/kqp/ya.make
+++ b/ydb/core/kqp/ya.make
@@ -61,6 +61,7 @@ RECURSE(
counters
executer_actor
expr_nodes
+ federated_query
gateway
host
node_service
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index ec4634efd6d..30fb7e37a87 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1391,6 +1391,8 @@ message TQueryServiceConfig {
optional NYql.TS3GatewayConfig S3 = 6;
optional NYql.THttpGatewayConfig HttpGateway = 7;
optional NYql.TGenericConnectorConfig Connector = 8;
+ optional string MdbGateway = 9;
+ optional bool MdbTransformHost = 10;
}
// Config describes immediate controls and allows
diff --git a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt
index c536df6d5da..196f5b664a1 100644
--- a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt
@@ -47,6 +47,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
core-kesus-tablet
ydb-core-keyvalue
ydb-core-kqp
+ core-kqp-federated_query
ydb-core-metering
ydb-core-mind
core-mind-address_classification
diff --git a/ydb/core/testlib/CMakeLists.linux-aarch64.txt b/ydb/core/testlib/CMakeLists.linux-aarch64.txt
index 9ed91887756..80814daf462 100644
--- a/ydb/core/testlib/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/testlib/CMakeLists.linux-aarch64.txt
@@ -48,6 +48,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
core-kesus-tablet
ydb-core-keyvalue
ydb-core-kqp
+ core-kqp-federated_query
ydb-core-metering
ydb-core-mind
core-mind-address_classification
diff --git a/ydb/core/testlib/CMakeLists.linux-x86_64.txt b/ydb/core/testlib/CMakeLists.linux-x86_64.txt
index 9ed91887756..80814daf462 100644
--- a/ydb/core/testlib/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/testlib/CMakeLists.linux-x86_64.txt
@@ -48,6 +48,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
core-kesus-tablet
ydb-core-keyvalue
ydb-core-kqp
+ core-kqp-federated_query
ydb-core-metering
ydb-core-mind
core-mind-address_classification
diff --git a/ydb/core/testlib/CMakeLists.windows-x86_64.txt b/ydb/core/testlib/CMakeLists.windows-x86_64.txt
index c536df6d5da..196f5b664a1 100644
--- a/ydb/core/testlib/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/testlib/CMakeLists.windows-x86_64.txt
@@ -47,6 +47,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
core-kesus-tablet
ydb-core-keyvalue
ydb-core-kqp
+ core-kqp-federated_query
ydb-core-metering
ydb-core-mind
core-mind-address_classification
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index da0510df57f..3385a8246d9 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -819,11 +819,11 @@ namespace Tests {
IActor* kqpProxyService = NKqp::CreateKqpProxyService(Settings->AppConfig.GetLogConfig(),
Settings->AppConfig.GetTableServiceConfig(),
- Settings->AppConfig.GetAuthConfig().GetTokenAccessorConfig(),
Settings->AppConfig.GetQueryServiceConfig(),
Settings->AppConfig.GetMetadataProviderConfig(),
TVector<NKikimrKqp::TKqpSetting>(Settings->KqpSettings),
- nullptr, std::move(kqpProxySharedResources));
+ nullptr, std::move(kqpProxySharedResources),
+ Settings->FederatedQuerySetupFactory);
TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx);
Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx);
}
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index fe9e3f42b9b..d461d403d8a 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -22,6 +22,7 @@
#include <ydb/core/testlib/basics/appdata.h>
#include <ydb/core/protos/kesus.pb.h>
#include <ydb/core/kesus/tablet/events.h>
+#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
#include <ydb/core/security/ticket_parser.h>
#include <ydb/core/base/grpc_service_factory.h>
#include <ydb/core/persqueue/actor_persqueue_client_iface.h>
@@ -137,6 +138,7 @@ namespace Tests {
bool EnableMetering = false;
TString MeteringFilePath;
TString AwsRegion;
+ NKqp::IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory = std::make_shared<NKqp::TKqpFederatedQuerySetupFactoryNoop>();
std::function<IActor*(const NKikimrProto::TAuthConfig&)> CreateTicketParser = NKikimr::CreateTicketParser;
std::shared_ptr<TGrpcServiceFactory> GrpcServiceFactory;
@@ -179,6 +181,7 @@ namespace Tests {
TServerSettings& SetChangesQueueBytesLimit(ui64 value) { ChangesQueueBytesLimit = value; return *this; }
TServerSettings& SetMeteringFilePath(const TString& path) { EnableMetering = true; MeteringFilePath = path; return *this; }
TServerSettings& SetAwsRegion(const TString& value) { AwsRegion = value; return *this; }
+ TServerSettings& SetFederatedQuerySetupFactory(NKqp::IKqpFederatedQuerySetupFactory::TPtr value) { FederatedQuerySetupFactory = value; return *this; }
TServerSettings& SetPersQueueGetReadSessionsInfoWorkerFactory(
std::shared_ptr<NKikimr::NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> factory
) {
diff --git a/ydb/core/testlib/ya.make b/ydb/core/testlib/ya.make
index 8f94d82c799..af028b07f5b 100644
--- a/ydb/core/testlib/ya.make
+++ b/ydb/core/testlib/ya.make
@@ -51,6 +51,7 @@ PEERDIR(
ydb/core/kesus/tablet
ydb/core/keyvalue
ydb/core/kqp
+ ydb/core/kqp/federated_query
ydb/core/metering
ydb/core/mind
ydb/core/mind/address_classification
diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
index 772d1c86b7e..ddf67987c24 100644
--- a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
@@ -50,7 +50,7 @@ public:
Server = new TServer(serverSettings);
Runtime = Server->GetRuntime();
- Runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NLog::PRI_DEBUG);
+ Runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_DEBUG);
TDispatchOptions rmReady;
rmReady.CustomFinalCondition = [this] {
@@ -65,7 +65,7 @@ public:
};
Runtime->DispatchEvents(rmReady);
- Runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NLog::PRI_NOTICE);
+ Runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_NOTICE);
// Runtime->SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
// Runtime->SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
Runtime->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_TRACE);
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
index 4a58980d9d0..aaf2750bad1 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
@@ -88,6 +88,8 @@ struct TDatabaseResolverResponse {
class IDatabaseAsyncResolver {
public:
+ using TPtr = std::shared_ptr<IDatabaseAsyncResolver>;
+
using TDatabaseAuthMap = THashMap<std::pair<TString, EDatabaseType>, NYql::TDatabaseAuth>;
virtual NThreading::TFuture<NYql::TDatabaseResolverResponse> ResolveIds(const TDatabaseAuthMap& ids) const = 0;