diff options
author | vitalyisaev <vitalyisaev@yandex-team.com> | 2023-09-01 12:24:13 +0300 |
---|---|---|
committer | vitalyisaev <vitalyisaev@yandex-team.com> | 2023-09-01 12:47:31 +0300 |
commit | 41fb59788d7929ed2ef5a9382669414db05ed34c (patch) | |
tree | 3f3038f01642a200609a6f457dd949b769735d27 | |
parent | cc871496ef02852bf6ca96470fa61c253940ddf4 (diff) | |
download | ydb-41fb59788d7929ed2ef5a9382669414db05ed34c.tar.gz |
YQ Connector: initialize Federated Query clients and actors in KQP
1. Появилась папка `ydb/core/kqp/federated_query`, где инкапсулировано всё необходимое для запуска Federated Query в KQP - HTTP и GRPC клиенты, там же создаются служебные акторы, такие как `DatabaseResolver`
2. Переходим на использование `TQueryServiceConfig`. Туда же перенесена ещё пара параметров из `FederatedQueryConfig`.
3. Юнит-тесты на S3 отрефакторены, выделена общая часть, которая будет использоваться в юнит-тестах на Generic провайдер.
108 files changed, 970 insertions, 401 deletions
diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt index 7fe9caae62a..44e811c14f9 100644 --- a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt @@ -68,6 +68,7 @@ target_link_libraries(run PUBLIC ydb-core-keyvalue ydb-core-kafka_proxy ydb-core-kqp + core-kqp-federated_query core-kqp-rm_service ydb-core-load_test ydb-core-local_pgwire diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt index 5d41e085cfa..86b089d2ee7 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt @@ -69,6 +69,7 @@ target_link_libraries(run PUBLIC ydb-core-keyvalue ydb-core-kafka_proxy ydb-core-kqp + core-kqp-federated_query core-kqp-rm_service ydb-core-load_test ydb-core-local_pgwire diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt index 5d41e085cfa..86b089d2ee7 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt @@ -69,6 +69,7 @@ target_link_libraries(run PUBLIC ydb-core-keyvalue ydb-core-kafka_proxy ydb-core-kqp + core-kqp-federated_query core-kqp-rm_service ydb-core-load_test ydb-core-local_pgwire diff --git a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt index 7fe9caae62a..44e811c14f9 100644 --- a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt @@ -68,6 +68,7 @@ target_link_libraries(run PUBLIC ydb-core-keyvalue ydb-core-kafka_proxy ydb-core-kqp + core-kqp-federated_query core-kqp-rm_service ydb-core-load_test ydb-core-local_pgwire diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 44af92665df..10383b4a19d 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -2105,8 +2105,10 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu GlobalObjects.AddGlobalObject(std::make_shared<NYql::NLog::YqlLoggerScope>( new NYql::NLog::TTlsLogBackend(new TNullLogBackend()))); - auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(), Config.GetAuthConfig().GetTokenAccessorConfig(), - Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources)); + auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(), + Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources), + NKqp::MakeKqpFederatedQuerySetupFactory(setup, appData, Config) + ); setup->LocalServices.push_back(std::make_pair( NKqp::MakeKqpProxyID(NodeId), TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId))); diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index 4635d10cfce..14240062100 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -83,6 +83,7 @@ PEERDIR( ydb/core/keyvalue ydb/core/kafka_proxy ydb/core/kqp + ydb/core/kqp/federated_query ydb/core/kqp/rm_service ydb/core/load_test ydb/core/local_pgwire diff --git a/ydb/core/fq/libs/actors/database_resolver.cpp b/ydb/core/fq/libs/actors/database_resolver.cpp index 9591ff9b51d..0ebbdc0e653 100644 --- a/ydb/core/fq/libs/actors/database_resolver.cpp +++ b/ydb/core/fq/libs/actors/database_resolver.cpp @@ -261,7 +261,6 @@ public: TString endpoint; TVector<TString> aliveHosts; for (const auto& host : databaseInfo.GetMap().at("hosts").GetArraySafe()) { - // all host services must be alive bool alive = true; for (const auto& service: host.GetMap().at("services").GetArraySafe()) { @@ -397,4 +396,8 @@ NActors::IActor* CreateDatabaseResolver(NActors::TActorId httpProxy, ISecuredSer return new TDatabaseResolver(httpProxy, credentialsFactory); } +NActors::TActorId MakeDatabaseResolverActorId() { + return NActors::TActorId(0, "DBRESOLVER"); +} + } /* namespace NFq */ diff --git a/ydb/core/fq/libs/actors/database_resolver.h b/ydb/core/fq/libs/actors/database_resolver.h index 21035416820..f7ebd421a19 100644 --- a/ydb/core/fq/libs/actors/database_resolver.h +++ b/ydb/core/fq/libs/actors/database_resolver.h @@ -7,5 +7,6 @@ namespace NFq { NActors::IActor* CreateDatabaseResolver(NActors::TActorId httpProxy, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); +NActors::TActorId MakeDatabaseResolverActorId(); } /* namespace NFq */ diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index 7fea6502a66..689d17c3da7 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -59,7 +59,6 @@ #include <ydb/core/fq/libs/control_plane_storage/events/events.h> #include <ydb/core/fq/libs/control_plane_storage/util.h> #include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h> -#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h> #include <ydb/core/fq/libs/gateway/empty_gateway.h> #include <ydb/core/fq/libs/private_client/events.h> #include <ydb/core/fq/libs/private_client/private_client.h> @@ -1832,7 +1831,7 @@ private: Params.DatabaseResolver, Params.Config.GetCommon().GetYdbMvpCloudEndpoint(), Params.Config.GetCommon().GetMdbGateway(), - NFq::MakeMdbEndpointGeneratorGeneric(Params.Config.GetCommon().GetMdbTransformHost()), + Params.MdbEndpointGenerator, Params.QueryId); { // TBD: move init to better place diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt index df13fd872f3..f559b414c0e 100644 --- a/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/compute/common/CMakeLists.darwin-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(libs-compute-common PUBLIC contrib-libs-cxxsupp yutil libs-config-protos + fq-libs-db_id_async_resolver_impl fq-libs-grpc fq-libs-shared_resources providers-dq-provider diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt index db45ab0353d..387af424f08 100644 --- a/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/compute/common/CMakeLists.linux-aarch64.txt @@ -17,6 +17,7 @@ target_link_libraries(libs-compute-common PUBLIC contrib-libs-cxxsupp yutil libs-config-protos + fq-libs-db_id_async_resolver_impl fq-libs-grpc fq-libs-shared_resources providers-dq-provider diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt index db45ab0353d..387af424f08 100644 --- a/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/compute/common/CMakeLists.linux-x86_64.txt @@ -17,6 +17,7 @@ target_link_libraries(libs-compute-common PUBLIC contrib-libs-cxxsupp yutil libs-config-protos + fq-libs-db_id_async_resolver_impl fq-libs-grpc fq-libs-shared_resources providers-dq-provider diff --git a/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt index df13fd872f3..f559b414c0e 100644 --- a/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/compute/common/CMakeLists.windows-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(libs-compute-common PUBLIC contrib-libs-cxxsupp yutil libs-config-protos + fq-libs-db_id_async_resolver_impl fq-libs-grpc fq-libs-shared_resources providers-dq-provider diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.cpp b/ydb/core/fq/libs/compute/common/run_actor_params.cpp index b621f895c81..3bd287188fe 100644 --- a/ydb/core/fq/libs/compute/common/run_actor_params.cpp +++ b/ydb/core/fq/libs/compute/common/run_actor_params.cpp @@ -1,5 +1,7 @@ #include "run_actor_params.h" +#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h> + namespace NFq { using namespace NActors; @@ -71,6 +73,7 @@ TRunActorParams::TRunActorParams( , Scope(scope) , AuthToken(authToken) , DatabaseResolver(databaseResolver) + , MdbEndpointGenerator(NFq::MakeMdbEndpointGeneratorGeneric(config.GetCommon().GetMdbTransformHost())) , QueryId(queryId) , UserId(userId) , Owner(owner) diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.h b/ydb/core/fq/libs/compute/common/run_actor_params.h index f3c1f54dbe1..8a5a7913f7d 100644 --- a/ydb/core/fq/libs/compute/common/run_actor_params.h +++ b/ydb/core/fq/libs/compute/common/run_actor_params.h @@ -98,6 +98,7 @@ struct TRunActorParams { // TODO2 : Change name const NYdb::NFq::TScope Scope; const TString AuthToken; const NActors::TActorId DatabaseResolver; + const NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator; const TString QueryId; const TString UserId; const TString Owner; diff --git a/ydb/core/fq/libs/compute/common/ya.make b/ydb/core/fq/libs/compute/common/ya.make index a00549a2742..ebb41b1066a 100644 --- a/ydb/core/fq/libs/compute/common/ya.make +++ b/ydb/core/fq/libs/compute/common/ya.make @@ -8,6 +8,7 @@ SRCS( PEERDIR( ydb/core/fq/libs/config/protos + ydb/core/fq/libs/db_id_async_resolver_impl ydb/core/fq/libs/grpc ydb/core/fq/libs/shared_resources ydb/library/yql/providers/dq/provider diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp index 97b9f1f40ab..814040847d3 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp @@ -1,6 +1,7 @@ #include "db_async_resolver_impl.h" #include <library/cpp/actors/core/actorsystem.h> +#include <ydb/core/fq/libs/events/events.h> namespace NFq { using namespace NThreading; @@ -10,13 +11,13 @@ TDatabaseAsyncResolverImpl::TDatabaseAsyncResolverImpl( const NActors::TActorId& recipient, const TString& ydbMvpEndpoint, const TString& mdbGateway, - NYql::IMdbEndpointGenerator::TPtr&& mdbEndpointGenerator, + const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator, const TString& traceId) : ActorSystem(actorSystem) , Recipient(recipient) , YdbMvpEndpoint(ydbMvpEndpoint) , MdbGateway(mdbGateway) - , mdbEndpointGenerator(std::move(mdbEndpointGenerator)) + , MdbEndpointGenerator(mdbEndpointGenerator) , TraceId(traceId) { } @@ -45,7 +46,7 @@ TFuture<NYql::TDatabaseResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds( ActorSystem->Send(new NActors::IEventHandle(Recipient, callbackId, new TEvents::TEvEndpointRequest(ids, YdbMvpEndpoint, MdbGateway, - TraceId, mdbEndpointGenerator))); + TraceId, MdbEndpointGenerator))); return promise.GetFuture(); } diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h index 9e86ab16c0b..f22f4e764a2 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h @@ -1,6 +1,5 @@ #pragma once -#include <ydb/core/fq/libs/events/events.h> #include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> #include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_endpoint_generator.h> #include <ydb/library/yql/providers/dq/actors/actor_helpers.h> @@ -14,7 +13,7 @@ public: const NActors::TActorId& recipient, const TString& ydbMvpEndpoint, const TString& mdbGateway, - NYql::IMdbEndpointGenerator::TPtr&& endpointGenerator, + const NYql::IMdbEndpointGenerator::TPtr& endpointGenerator, const TString& traceId = "" ); @@ -24,7 +23,7 @@ private: const NActors::TActorId Recipient; const TString YdbMvpEndpoint; const TString MdbGateway; - NYql::IMdbEndpointGenerator::TPtr mdbEndpointGenerator; + const NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator; const TString TraceId; }; diff --git a/ydb/core/fq/libs/test_connection/test_connection.cpp b/ydb/core/fq/libs/test_connection/test_connection.cpp index 2d9621b4ada..521442ade45 100644 --- a/ydb/core/fq/libs/test_connection/test_connection.cpp +++ b/ydb/core/fq/libs/test_connection/test_connection.cpp @@ -107,6 +107,7 @@ class TTestConnectionActor : public NActors::TActorBootstrapped<TTestConnectionA TActorId DatabaseResolverActor; std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver; NYql::IHTTPGateway::TPtr HttpGateway; + NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator; public: TTestConnectionActor( @@ -131,6 +132,7 @@ public: , Counters(counters) , Signer(signer) , HttpGateway(httpGateway) + , MdbEndpointGenerator(NFq::MakeMdbEndpointGeneratorGeneric(commonConfig.GetMdbTransformHost())) {} static constexpr char ActorName[] = "YQ_TEST_CONNECTION"; @@ -144,7 +146,7 @@ public: DbResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>( NActors::TActivationContext::ActorSystem(), DatabaseResolverActor, CommonConfig.GetYdbMvpCloudEndpoint(), CommonConfig.GetMdbGateway(), - NFq::MakeMdbEndpointGeneratorGeneric(CommonConfig.GetMdbTransformHost()) + MdbEndpointGenerator ); Become(&TTestConnectionActor::StateFunc); diff --git a/ydb/core/fq/libs/test_connection/test_connection.h b/ydb/core/fq/libs/test_connection/test_connection.h index 65517cc48b9..d3fd0c5c907 100644 --- a/ydb/core/fq/libs/test_connection/test_connection.h +++ b/ydb/core/fq/libs/test_connection/test_connection.h @@ -7,6 +7,8 @@ #include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h> #include <ydb/core/fq/libs/shared_resources/shared_resources.h> #include <ydb/core/fq/libs/signer/signer.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <ydb/library/yql/providers/pq/cm_client/client.h> #include <ydb/public/api/protos/draft/fq.pb.h> diff --git a/ydb/core/kqp/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/CMakeLists.darwin-x86_64.txt index e82ec0d41ac..1bad2f50459 100644 --- a/ydb/core/kqp/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/CMakeLists.darwin-x86_64.txt @@ -12,6 +12,7 @@ add_subdirectory(compute_actor) add_subdirectory(counters) add_subdirectory(executer_actor) add_subdirectory(expr_nodes) +add_subdirectory(federated_query) add_subdirectory(gateway) add_subdirectory(host) add_subdirectory(node_service) diff --git a/ydb/core/kqp/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/CMakeLists.linux-aarch64.txt index 9e4b855487f..5337a48cda3 100644 --- a/ydb/core/kqp/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/CMakeLists.linux-aarch64.txt @@ -12,6 +12,7 @@ add_subdirectory(compute_actor) add_subdirectory(counters) add_subdirectory(executer_actor) add_subdirectory(expr_nodes) +add_subdirectory(federated_query) add_subdirectory(gateway) add_subdirectory(host) add_subdirectory(node_service) diff --git a/ydb/core/kqp/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/CMakeLists.linux-x86_64.txt index 9e4b855487f..5337a48cda3 100644 --- a/ydb/core/kqp/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/CMakeLists.linux-x86_64.txt @@ -12,6 +12,7 @@ add_subdirectory(compute_actor) add_subdirectory(counters) add_subdirectory(executer_actor) add_subdirectory(expr_nodes) +add_subdirectory(federated_query) add_subdirectory(gateway) add_subdirectory(host) add_subdirectory(node_service) diff --git a/ydb/core/kqp/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/CMakeLists.windows-x86_64.txt index e82ec0d41ac..1bad2f50459 100644 --- a/ydb/core/kqp/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/CMakeLists.windows-x86_64.txt @@ -12,6 +12,7 @@ add_subdirectory(compute_actor) add_subdirectory(counters) add_subdirectory(executer_actor) add_subdirectory(expr_nodes) +add_subdirectory(federated_query) add_subdirectory(gateway) add_subdirectory(host) add_subdirectory(node_service) diff --git a/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt index 0dfabe36a35..1288e5d8438 100644 --- a/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/compile_service/CMakeLists.darwin-x86_64.txt @@ -16,9 +16,9 @@ target_link_libraries(core-kqp-compile_service PUBLIC yutil ydb-core-actorlib_impl ydb-core-base - core-kqp-host kqp-common-simple - providers-common-http_gateway + core-kqp-federated_query + core-kqp-host ) target_sources(core-kqp-compile_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp diff --git a/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt index e3c1d4e8e3d..04756092e49 100644 --- a/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/compile_service/CMakeLists.linux-aarch64.txt @@ -17,9 +17,9 @@ target_link_libraries(core-kqp-compile_service PUBLIC yutil ydb-core-actorlib_impl ydb-core-base - core-kqp-host kqp-common-simple - providers-common-http_gateway + core-kqp-federated_query + core-kqp-host ) target_sources(core-kqp-compile_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp diff --git a/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt index e3c1d4e8e3d..04756092e49 100644 --- a/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/compile_service/CMakeLists.linux-x86_64.txt @@ -17,9 +17,9 @@ target_link_libraries(core-kqp-compile_service PUBLIC yutil ydb-core-actorlib_impl ydb-core-base - core-kqp-host kqp-common-simple - providers-common-http_gateway + core-kqp-federated_query + core-kqp-host ) target_sources(core-kqp-compile_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp diff --git a/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt index 0dfabe36a35..1288e5d8438 100644 --- a/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/compile_service/CMakeLists.windows-x86_64.txt @@ -16,9 +16,9 @@ target_link_libraries(core-kqp-compile_service PUBLIC yutil ydb-core-actorlib_impl ydb-core-base - core-kqp-host kqp-common-simple - providers-common-http_gateway + core-kqp-federated_query + core-kqp-host ) target_sources(core-kqp-compile_service PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compile_service/kqp_compile_actor.cpp diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 7de2211aa82..22894aefe6f 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -57,18 +57,19 @@ public: return NKikimrServices::TActivity::KQP_COMPILE_ACTOR; } - TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, const TTableServiceConfig& serviceConfig, - const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, NYql::IHTTPGateway::TPtr httpGateway, + TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, + const TTableServiceConfig& serviceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const TString& uid, const TKqpQueryId& queryId, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, - TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState) + TKqpDbCountersPtr dbCounters, + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, + NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState) : Owner(owner) - , HttpGateway(std::move(httpGateway)) , ModuleResolverState(moduleResolverState) , Counters(counters) - , CredentialsFactory(std::move(credentialsFactory)) + , FederatedQuerySetup(federatedQuerySetup) , Uid(uid) , QueryId(queryId) , QueryRef(QueryId.Text, QueryId.QueryParameterTypes) @@ -126,7 +127,7 @@ public: Config->FeatureFlags = AppData(ctx)->FeatureFlags; KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver, - HttpGateway, AppData(ctx)->FunctionRegistry, false, false, CredentialsFactory, std::move(TempTablesState)); + FederatedQuerySetup, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState)); IKqpHost::TPrepareSettings prepareSettings; prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted; @@ -370,10 +371,9 @@ private: private: TActorId Owner; - NYql::IHTTPGateway::TPtr HttpGateway; TIntrusivePtr<TModuleResolverState> ModuleResolverState; TIntrusivePtr<TKqpCounters> Counters; - NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; + std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup; TString Uid; TKqpQueryId QueryId; TKqpQueryRef QueryRef; @@ -420,16 +420,19 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.PredicateExtract20 = serviceConfig.GetPredicateExtract20(); } -IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, const TTableServiceConfig& serviceConfig, - const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, NYql::IHTTPGateway::TPtr httpGateway, +IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, + const TTableServiceConfig& serviceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState) { - return new TKqpCompileActor(owner, kqpSettings, serviceConfig, metadataProviderConfig, std::move(httpGateway), moduleResolverState, - counters, std::move(credentialsFactory), - uid, query, userToken, dbCounters, std::move(traceId), std::move(tempTablesState)); + return new TKqpCompileActor(owner, kqpSettings, serviceConfig, metadataProviderConfig, + moduleResolverState, counters, + uid, query, userToken, dbCounters, + federatedQuerySetup, + std::move(traceId), std::move(tempTablesState)); } } // namespace NKqp diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 960fdc85151..7bd700c4a86 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -311,8 +311,8 @@ public: const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - NYql::IHTTPGateway::TPtr httpGateway) + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup + ) : Config(serviceConfig) , MetadataProviderConfig(metadataProviderConfig) , KqpSettings(kqpSettings) @@ -321,8 +321,7 @@ public: , QueryCache(Config.GetCompileQueryCacheSize(), TDuration::Seconds(Config.GetCompileQueryCacheTTLSec())) , RequestsQueue(Config.GetCompileRequestQueueSize()) , QueryReplayFactory(std::move(queryReplayFactory)) - , CredentialsFactory(std::move(credentialsFactory)) - , HttpGateway(std::move(httpGateway)) + , FederatedQuerySetup(federatedQuerySetup) {} void Bootstrap(const TActorContext& ctx) { @@ -739,8 +738,9 @@ private: } void StartCompilation(TKqpCompileRequest&& request, const TActorContext& ctx) { - auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, Config, MetadataProviderConfig, HttpGateway, ModuleResolverState, Counters, - CredentialsFactory, request.Uid, request.Query, request.UserToken, request.DbCounters, request.CompileServiceSpan.GetTraceId(), std::move(request.TempTablesState)); + auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, Config, MetadataProviderConfig, ModuleResolverState, Counters, + request.Uid, request.Query, request.UserToken, FederatedQuerySetup, request.DbCounters, request.CompileServiceSpan.GetTraceId(), + std::move(request.TempTablesState)); auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap, AppData(ctx)->UserPoolId); @@ -828,19 +828,18 @@ private: TKqpQueryCache QueryCache; TKqpRequestsQueue RequestsQueue; std::shared_ptr<IQueryReplayBackendFactory> QueryReplayFactory; - NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; - NYql::IHTTPGateway::TPtr HttpGateway; + std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup; }; IActor* CreateKqpCompileService(const TTableServiceConfig& serviceConfig, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - NYql::IHTTPGateway::TPtr httpGateway) + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup + ) { return new TKqpCompileService(serviceConfig, metadataProviderConfig, kqpSettings, moduleResolverState, counters, - std::move(queryReplayFactory), std::move(credentialsFactory), std::move(httpGateway)); + std::move(queryReplayFactory), federatedQuerySetup); } } // namespace NKqp diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h index 32c28748e19..ff5ac1f218b 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.h +++ b/ydb/core/kqp/compile_service/kqp_compile_service.h @@ -3,8 +3,7 @@ #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/common/simple/temp_tables.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> -#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> -#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> +#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h> namespace NKikimr { namespace NKqp { @@ -13,19 +12,19 @@ IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& servic const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - NYql::IHTTPGateway::TPtr httpGateway); + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup + ); IActor* CreateKqpCompileComputationPatternService(const NKikimrConfig::TTableServiceConfig& serviceConfig, TIntrusivePtr<TKqpCounters> counters); IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, - const NKikimrConfig::TTableServiceConfig& serviceConfig, - const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, NYql::IHTTPGateway::TPtr httpGateway, + const NKikimrConfig::TTableServiceConfig& serviceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr); diff --git a/ydb/core/kqp/compile_service/ya.make b/ydb/core/kqp/compile_service/ya.make index 9eee94d92b5..53ba78b8be3 100644 --- a/ydb/core/kqp/compile_service/ya.make +++ b/ydb/core/kqp/compile_service/ya.make @@ -9,9 +9,9 @@ SRCS( PEERDIR( ydb/core/actorlib_impl ydb/core/base - ydb/core/kqp/host ydb/core/kqp/common/simple - ydb/library/yql/providers/common/http_gateway + ydb/core/kqp/federated_query + ydb/core/kqp/host ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt index cbd0d6bc71d..c27956e5aa4 100644 --- a/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/compute_actor/CMakeLists.darwin-x86_64.txt @@ -22,11 +22,12 @@ target_link_libraries(core-kqp-compute_actor PUBLIC yutil ydb-core-actorlib_impl ydb-core-base + core-kqp-federated_query core-kqp-runtime core-tx-datashard core-tx-scheme_cache dq-actors-compute - providers-common-http_gateway + providers-generic-actors providers-s3-actors yql-public-issue tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt index 8b25a121abc..4eb2135274e 100644 --- a/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/compute_actor/CMakeLists.linux-aarch64.txt @@ -23,11 +23,12 @@ target_link_libraries(core-kqp-compute_actor PUBLIC yutil ydb-core-actorlib_impl ydb-core-base + core-kqp-federated_query core-kqp-runtime core-tx-datashard core-tx-scheme_cache dq-actors-compute - providers-common-http_gateway + providers-generic-actors providers-s3-actors yql-public-issue tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt index 8b25a121abc..4eb2135274e 100644 --- a/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/compute_actor/CMakeLists.linux-x86_64.txt @@ -23,11 +23,12 @@ target_link_libraries(core-kqp-compute_actor PUBLIC yutil ydb-core-actorlib_impl ydb-core-base + core-kqp-federated_query core-kqp-runtime core-tx-datashard core-tx-scheme_cache dq-actors-compute - providers-common-http_gateway + providers-generic-actors providers-s3-actors yql-public-issue tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt index cbd0d6bc71d..c27956e5aa4 100644 --- a/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/compute_actor/CMakeLists.windows-x86_64.txt @@ -22,11 +22,12 @@ target_link_libraries(core-kqp-compute_actor PUBLIC yutil ydb-core-actorlib_impl ydb-core-base + core-kqp-federated_query core-kqp-runtime core-tx-datashard core-tx-scheme_cache dq-actors-compute - providers-common-http_gateway + providers-generic-actors providers-s3-actors yql-public-issue tools-enum_parser-enum_serialization_runtime diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index 97e6c30fa09..d0232c0ea66 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -10,6 +10,8 @@ #include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h> #include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h> #include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h> +#include <ydb/library/yql/providers/generic/actors/yql_generic_source_factory.h> + namespace NKikimr { namespace NMiniKQL { @@ -55,13 +57,23 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput namespace NKqp { -NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters, const NYql::IHTTPGateway::TPtr& httpGateway, const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) { +NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory( + TIntrusivePtr<TKqpCounters> counters, + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup) { auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); RegisterStreamLookupActorFactory(*factory, counters); RegisterKqpReadActor(*factory, counters); - RegisterS3ReadActorFactory(*factory, credentialsFactory, httpGateway); - RegisterS3WriteActorFactory(*factory, credentialsFactory, httpGateway); RegisterSequencerActorFactory(*factory, counters); + + if (federatedQuerySetup) { + RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway); + RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway); + + if (federatedQuerySetup->ConnectorClient) { + RegisterGenericReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->ConnectorClient); + } + } + return factory; } diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index 1911a336e61..fd4c21150b6 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -1,11 +1,11 @@ #pragma once -#include <ydb/core/kqp/counters/kqp_counters.h> + #include <ydb/core/kqp/compute_actor/kqp_compute_events.h> +#include <ydb/core/kqp/counters/kqp_counters.h> +#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h> +#include <ydb/core/scheme/scheme_tabledefs.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> -#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> -#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> -#include <ydb/core/scheme/scheme_tabledefs.h> namespace NKikimr { @@ -59,7 +59,8 @@ IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vect const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings, const ui64 txId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId); -NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(TIntrusivePtr<TKqpCounters> counters, const NYql::IHTTPGateway::TPtr& httpGateway, const NYql::ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory); +NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory( + TIntrusivePtr<TKqpCounters> counters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/compute_actor/ya.make b/ydb/core/kqp/compute_actor/ya.make index a2f59717f2b..570d174d739 100644 --- a/ydb/core/kqp/compute_actor/ya.make +++ b/ydb/core/kqp/compute_actor/ya.make @@ -17,11 +17,12 @@ SRCS( PEERDIR( ydb/core/actorlib_impl ydb/core/base + ydb/core/kqp/federated_query ydb/core/kqp/runtime ydb/core/tx/datashard ydb/core/tx/scheme_cache ydb/library/yql/dq/actors/compute - ydb/library/yql/providers/common/http_gateway + ydb/library/yql/providers/generic/actors ydb/library/yql/providers/s3/actors ydb/library/yql/public/issue ) diff --git a/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..551388ba4e2 --- /dev/null +++ b/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-federated_query) +target_compile_options(core-kqp-federated_query PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-federated_query PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-base + fq-libs-db_id_async_resolver_impl + fq-libs-grpc + library-db_pool-protos + providers-common-http_gateway + generic-connector-libcpp +) +target_sources(core-kqp-federated_query PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp +) diff --git a/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..a662d35a7cc --- /dev/null +++ b/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt @@ -0,0 +1,27 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-federated_query) +target_compile_options(core-kqp-federated_query PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-federated_query PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-base + fq-libs-db_id_async_resolver_impl + fq-libs-grpc + library-db_pool-protos + providers-common-http_gateway + generic-connector-libcpp +) +target_sources(core-kqp-federated_query PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp +) diff --git a/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..a662d35a7cc --- /dev/null +++ b/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt @@ -0,0 +1,27 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-federated_query) +target_compile_options(core-kqp-federated_query PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-federated_query PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-base + fq-libs-db_id_async_resolver_impl + fq-libs-grpc + library-db_pool-protos + providers-common-http_gateway + generic-connector-libcpp +) +target_sources(core-kqp-federated_query PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp +) diff --git a/ydb/core/kqp/federated_query/CMakeLists.txt b/ydb/core/kqp/federated_query/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/kqp/federated_query/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..551388ba4e2 --- /dev/null +++ b/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-federated_query) +target_compile_options(core-kqp-federated_query PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-federated_query PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-base + fq-libs-db_id_async_resolver_impl + fq-libs-grpc + library-db_pool-protos + providers-common-http_gateway + generic-connector-libcpp +) +target_sources(core-kqp-federated_query PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp +) diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp new file mode 100644 index 00000000000..345fc8db6cd --- /dev/null +++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp @@ -0,0 +1,122 @@ +#include "kqp_federated_query_helpers.h" + +#include <library/cpp/actors/http/http_proxy.h> + +#include <ydb/core/fq/libs/actors/database_resolver.h> +#include <ydb/core/fq/libs/actors/proxy.h> +#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h> +#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h> + +namespace NKikimr::NKqp { + + NYql::THttpGatewayConfig DefaultHttpGatewayConfig() { + NYql::THttpGatewayConfig config; + config.SetMaxInFlightCount(2000); + config.SetMaxSimulatenousDownloadsSize(2000000000); + config.SetBuffersSizePerStream(5000000); + config.SetConnectionTimeoutSeconds(15); + config.SetRequestTimeoutSeconds(0); + return config; + } + + std::pair<TString, bool> ParseGrpcEndpoint(const TString& endpoint) { + TStringBuf scheme; + TStringBuf host; + TStringBuf uri; + NHttp::CrackURL(endpoint, scheme, host, uri); + + return std::make_pair(ToString(host), scheme == "grpcs"); + } + + // TKqpFederatedQuerySetupFactoryDefault contains network clients and service actors necessary + // for federated queries. HTTP Gateway (required by S3 provider) is run by default even without + // explicit configuration. Token Accessor and Connector Client are run only if config is provided. + TKqpFederatedQuerySetupFactoryDefault::TKqpFederatedQuerySetupFactoryDefault( + NActors::TActorSystemSetup* setup, + const NKikimr::TAppData* appData, + const NKikimrConfig::TAppConfig& appConfig) { + const auto& queryServiceConfig = appConfig.GetQueryServiceConfig(); + + // Initialize HTTP Gateway + HttpGatewayConfig = queryServiceConfig.HasHttpGateway() ? queryServiceConfig.GetHttpGateway() : DefaultHttpGatewayConfig(); + HttpGateway = NYql::IHTTPGateway::Make(&HttpGatewayConfig); + + // Initialize Token Accessor + if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) { + const auto& tokenAccessorConfig = appConfig.GetAuthConfig().GetTokenAccessorConfig(); + TString caContent; + if (const auto& path = tokenAccessorConfig.GetSslCaCert()) { + caContent = TUnbufferedFileInput(path).ReadAll(); + } + + auto parsed = ParseGrpcEndpoint(tokenAccessorConfig.GetEndpoint()); + CredentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory( + parsed.first, + parsed.second, + caContent, + tokenAccessorConfig.GetConnectionPoolSize()); + } + + // Initialize Connector client + if (queryServiceConfig.HasConnector()) { + ConnectorClient = NYql::NConnector::MakeClientGRPC(queryServiceConfig.GetConnector()); + + if (queryServiceConfig.HasMdbGateway()) { + MdbGateway = queryServiceConfig.GetMdbGateway(); + } + + if (queryServiceConfig.HasMdbTransformHost()) { + MdbEndpointGenerator = NFq::MakeMdbEndpointGeneratorGeneric(queryServiceConfig.GetMdbTransformHost()); + } + + // Create actors required for MDB database resolving + if (CredentialsFactory) { + auto httpProxyActor = NHttp::CreateHttpProxy(); + auto httpProxyActorId = NFq::MakeYqlAnalyticsHttpProxyId(); + setup->LocalServices.push_back( + std::make_pair( + httpProxyActorId, + TActorSetupCmd(httpProxyActor, TMailboxType::HTSwap, appData->UserPoolId))); + + // FIXME: how to choose appropriate ActorID? + DatabaseResolverActorId = NFq::MakeDatabaseResolverActorId(); + auto databaseResolverActor = NFq::CreateDatabaseResolver(httpProxyActorId, CredentialsFactory); + setup->LocalServices.push_back( + std::make_pair(DatabaseResolverActorId.value(), + TActorSetupCmd(databaseResolverActor, TMailboxType::HTSwap, appData->UserPoolId))); + } + } + } + + std::optional<TKqpFederatedQuerySetup> TKqpFederatedQuerySetupFactoryDefault::Make(NActors::TActorSystem* actorSystem) { + auto result = TKqpFederatedQuerySetup{ + HttpGateway, + ConnectorClient, + CredentialsFactory, + nullptr}; + + // Init DatabaseAsyncResolver only if all requirements are met + if (DatabaseResolverActorId && MdbGateway && MdbEndpointGenerator) { + result.DatabaseAsyncResovler = std::make_shared<NFq::TDatabaseAsyncResolverImpl>( + actorSystem, + DatabaseResolverActorId.value(), + "", // TODO: use YDB Gateway endpoint? + MdbGateway.value(), + MdbEndpointGenerator); + } + + return result; + } + + IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory( + NActors::TActorSystemSetup* setup, + const NKikimr::TAppData* appData, + const NKikimrConfig::TAppConfig& appConfig) { + // If Query Service is disabled, just do nothing + if (!appData->FeatureFlags.GetEnableScriptExecutionOperations()) { + return std::make_shared<TKqpFederatedQuerySetupFactoryNoop>(); + } + + return std::make_shared<NKikimr::NKqp::TKqpFederatedQuerySetupFactoryDefault>(setup, appData, appConfig); + } +} diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h new file mode 100644 index 00000000000..67b1eaecfa7 --- /dev/null +++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h @@ -0,0 +1,84 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> + +#include <ydb/core/base/appdata.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_endpoint_generator.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> +#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> +#include <ydb/library/yql/providers/generic/connector/libcpp/client.h> + +namespace NKikimr::NKqp { + + struct TKqpFederatedQuerySetup { + NYql::IHTTPGateway::TPtr HttpGateway; + NYql::NConnector::IClient::TPtr ConnectorClient; + NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; + NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResovler; + }; + + struct IKqpFederatedQuerySetupFactory { + using TPtr = std::shared_ptr<IKqpFederatedQuerySetupFactory>; + virtual std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem* actorSystem) = 0; + virtual ~IKqpFederatedQuerySetupFactory() = default; + }; + + struct TKqpFederatedQuerySetupFactoryNoop: public IKqpFederatedQuerySetupFactory { + std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem*) override { + return std::nullopt; + } + }; + + struct TKqpFederatedQuerySetupFactoryDefault: public IKqpFederatedQuerySetupFactory { + TKqpFederatedQuerySetupFactoryDefault(){}; + + TKqpFederatedQuerySetupFactoryDefault( + NActors::TActorSystemSetup* setup, + const NKikimr::TAppData* appData, + const NKikimrConfig::TAppConfig& appConfig); + + std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem* actorSystem) override; + + private: + NYql::THttpGatewayConfig HttpGatewayConfig; + NYql::IHTTPGateway::TPtr HttpGateway; + NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; + NYql::NConnector::IClient::TPtr ConnectorClient; + std::optional<NActors::TActorId> DatabaseResolverActorId; + NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator; + std::optional<TString> MdbGateway; + }; + + struct TKqpFederatedQuerySetupFactoryMock: public IKqpFederatedQuerySetupFactory { + TKqpFederatedQuerySetupFactoryMock() = delete; + + TKqpFederatedQuerySetupFactoryMock( + NYql::IHTTPGateway::TPtr httpGateway, + NYql::NConnector::IClient::TPtr connectorClient, + NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResovler) + : HttpGateway(httpGateway) + , ConnectorClient(connectorClient) + , CredentialsFactory(credentialsFactory) + , DatabaseAsyncResovler(databaseAsyncResovler) + { + } + + std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem*) override { + return TKqpFederatedQuerySetup{ + HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResovler}; + } + + private: + NYql::IHTTPGateway::TPtr HttpGateway; + NYql::NConnector::IClient::TPtr ConnectorClient; + NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; + NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResovler; + }; + + IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory( + NActors::TActorSystemSetup* setup, + const NKikimr::TAppData* appData, + const NKikimrConfig::TAppConfig& config); +} diff --git a/ydb/core/kqp/federated_query/ya.make b/ydb/core/kqp/federated_query/ya.make new file mode 100644 index 00000000000..4af74496ca6 --- /dev/null +++ b/ydb/core/kqp/federated_query/ya.make @@ -0,0 +1,18 @@ +LIBRARY() + +SRCS( + kqp_federated_query_helpers.cpp +) + +PEERDIR( + ydb/core/base + ydb/core/fq/libs/db_id_async_resolver_impl + ydb/core/fq/libs/grpc + ydb/library/db_pool/protos + ydb/library/yql/providers/common/http_gateway + ydb/library/yql/providers/generic/connector/libcpp +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt index 95d8c5b2728..1170ac5cd84 100644 --- a/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/host/CMakeLists.darwin-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(core-kqp-host PUBLIC yutil ydb-core-base core-kqp-common + core-kqp-federated_query core-kqp-opt core-kqp-provider tx-long_tx_service-public @@ -27,6 +28,7 @@ target_link_libraries(core-kqp-host PUBLIC providers-common-http_gateway providers-common-udf_resolve yql-providers-config + providers-generic-provider providers-result-provider providers-s3-provider ) diff --git a/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt index bec5bec644d..4955fb9ec50 100644 --- a/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/host/CMakeLists.linux-aarch64.txt @@ -17,6 +17,7 @@ target_link_libraries(core-kqp-host PUBLIC yutil ydb-core-base core-kqp-common + core-kqp-federated_query core-kqp-opt core-kqp-provider tx-long_tx_service-public @@ -28,6 +29,7 @@ target_link_libraries(core-kqp-host PUBLIC providers-common-http_gateway providers-common-udf_resolve yql-providers-config + providers-generic-provider providers-result-provider providers-s3-provider ) diff --git a/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt index bec5bec644d..4955fb9ec50 100644 --- a/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/host/CMakeLists.linux-x86_64.txt @@ -17,6 +17,7 @@ target_link_libraries(core-kqp-host PUBLIC yutil ydb-core-base core-kqp-common + core-kqp-federated_query core-kqp-opt core-kqp-provider tx-long_tx_service-public @@ -28,6 +29,7 @@ target_link_libraries(core-kqp-host PUBLIC providers-common-http_gateway providers-common-udf_resolve yql-providers-config + providers-generic-provider providers-result-provider providers-s3-provider ) diff --git a/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt index 95d8c5b2728..1170ac5cd84 100644 --- a/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/host/CMakeLists.windows-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(core-kqp-host PUBLIC yutil ydb-core-base core-kqp-common + core-kqp-federated_query core-kqp-opt core-kqp-provider tx-long_tx_service-public @@ -27,6 +28,7 @@ target_link_libraries(core-kqp-host PUBLIC providers-common-http_gateway providers-common-udf_resolve yql-providers-config + providers-generic-provider providers-result-provider providers-s3-provider ) diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 93f08ca2237..2af39a3c389 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -18,6 +18,8 @@ #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h> #include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h> +#include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h> +#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h> #include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> #include <ydb/library/yql/sql/sql.h> @@ -891,17 +893,16 @@ class TKqpHost : public IKqpHost { public: TKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver, - NYql::IHTTPGateway::TPtr httpGateway, + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, - bool isInternalCall, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) + bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) : Gateway(gateway) , Cluster(cluster) , ExprCtx(new TExprContext()) , ModuleResolver(moduleResolver) , KeepConfigChanges(keepConfigChanges) , IsInternalCall(isInternalCall) - , CredentialsFactory(std::move(credentialsFactory)) - , HttpGateway(std::move(httpGateway)) + , FederatedQuerySetup(federatedQuerySetup) , SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider)) , ClustersMap({{Cluster, TString(KikimrProviderName)}}) , TypesCtx(MakeIntrusive<TTypeAnnotationContext>()) @@ -1495,7 +1496,7 @@ private: auto state = MakeIntrusive<NYql::TS3State>(); state->Types = TypesCtx.Get(); state->FunctionRegistry = FuncRegistry; - state->CredentialsFactory = CredentialsFactory; + state->CredentialsFactory = FederatedQuerySetup->CredentialsFactory; // // TODO: Use TS3GatewayConfig from Kikimr Config when added @@ -1509,16 +1510,34 @@ private: } state->Configuration->Init(cfg, TypesCtx); - auto dataSource = NYql::CreateS3DataSource(state, HttpGateway); - auto dataSink = NYql::CreateS3DataSink(state, HttpGateway); + auto dataSource = NYql::CreateS3DataSource(state, FederatedQuerySetup->HttpGateway); + auto dataSink = NYql::CreateS3DataSink(state, FederatedQuerySetup->HttpGateway); TypesCtx->AddDataSource(NYql::S3ProviderName, std::move(dataSource)); TypesCtx->AddDataSink(NYql::S3ProviderName, std::move(dataSink)); } + void InitGenericProvider() { + if (!FederatedQuerySetup->ConnectorClient) { + return; + } + + auto state = MakeIntrusive<NYql::TGenericState>( + TypesCtx.Get(), + FuncRegistry, + FederatedQuerySetup->DatabaseAsyncResovler, + FederatedQuerySetup->ConnectorClient, + nullptr + ); + + TypesCtx->AddDataSource(NYql::GenericProviderName, NYql::CreateGenericDataSource(state)); + TypesCtx->AddDataSink(NYql::GenericProviderName, NYql::CreateGenericDataSink(state)); + } + void Init(EKikimrQueryType queryType) { - if (queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) { + if ((queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) && FederatedQuerySetup) { InitS3Provider(); + InitGenericProvider(); } KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry, TAppData::TimeProvider, TAppData::RandomProvider); @@ -1642,8 +1661,7 @@ private: IModuleResolver::TPtr ModuleResolver; bool KeepConfigChanges; bool IsInternalCall; - NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; - NYql::IHTTPGateway::TPtr HttpGateway; + std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup; TIntrusivePtr<TKikimrSessionContext> SessionCtx; THashMap<TString, TString> ClustersMap; @@ -1683,11 +1701,12 @@ Ydb::Table::QueryStatsCollection::Mode GetStatsMode(NYql::EKikimrStatsMode stats TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver, - NYql::IHTTPGateway::TPtr httpGateway, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, TKqpTempTablesState::TConstPtr tempTablesState) + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, + const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall, + TKqpTempTablesState::TConstPtr tempTablesState) { - return MakeIntrusive<TKqpHost>(gateway, cluster, database, config, moduleResolver, std::move(httpGateway), funcRegistry, - keepConfigChanges, isInternalCall, std::move(credentialsFactory), std::move(tempTablesState)); + return MakeIntrusive<TKqpHost>(gateway, cluster, database, config, moduleResolver, federatedQuerySetup, funcRegistry, + keepConfigChanges, isInternalCall, std::move(tempTablesState)); } } // namespace NKqp diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index 644a1dfc085..4eaebf1558b 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/kqp/provider/yql_kikimr_provider.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> @@ -100,8 +101,8 @@ public: TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, const TString& database, NYql::TKikimrConfiguration::TPtr config, NYql::IModuleResolver::TPtr moduleResolver, - NYql::IHTTPGateway::TPtr httpGateway, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, bool keepConfigChanges = false, bool isInternalCall = false, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, TKqpTempTablesState::TConstPtr tempTablesState = nullptr); + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, + bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/host/ya.make b/ydb/core/kqp/host/ya.make index 7f722836aee..0b7ff0368bd 100644 --- a/ydb/core/kqp/host/ya.make +++ b/ydb/core/kqp/host/ya.make @@ -12,6 +12,7 @@ SRCS( PEERDIR( ydb/core/base ydb/core/kqp/common + ydb/core/kqp/federated_query ydb/core/kqp/opt ydb/core/kqp/provider ydb/core/tx/long_tx_service/public @@ -23,6 +24,7 @@ PEERDIR( ydb/library/yql/providers/common/http_gateway ydb/library/yql/providers/common/udf_resolve ydb/library/yql/providers/config + ydb/library/yql/providers/generic/provider ydb/library/yql/providers/result/provider ydb/library/yql/providers/s3/provider ) diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp index e4380cf0621..a856f199a92 100644 --- a/ydb/core/kqp/node_service/kqp_node_ut.cpp +++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp @@ -184,8 +184,8 @@ public: Runtime->EnableScheduleForActor(ResourceManagerActorId, true); WaitForBootstrap(); - auto httpGateway = NYql::IHTTPGateway::Make(); - auto asyncIoFactory = CreateKqpAsyncIoFactory(KqpCounters, httpGateway, nullptr); + auto FederatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr}); + auto asyncIoFactory = CreateKqpAsyncIoFactory(KqpCounters, FederatedQuerySetup); auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), asyncIoFactory); KqpNodeActorId = Runtime->Register(kqpNode); Runtime->EnableScheduleForActor(KqpNodeActorId, true); diff --git a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt index 74fe7fa6156..f8582de4c37 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt @@ -34,6 +34,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC core-tx-schemeshard ydb-library-query_actor providers-common-http_gateway + providers-common-proto yql-public-issue api-protos public-lib-operation_id diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt index 87e67e8adfe..8469df07477 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt @@ -35,6 +35,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC core-tx-schemeshard ydb-library-query_actor providers-common-http_gateway + providers-common-proto yql-public-issue api-protos public-lib-operation_id diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt index 87e67e8adfe..8469df07477 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt @@ -35,6 +35,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC core-tx-schemeshard ydb-library-query_actor providers-common-http_gateway + providers-common-proto yql-public-issue api-protos public-lib-operation_id diff --git a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt index 74fe7fa6156..f8582de4c37 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt @@ -34,6 +34,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC core-tx-schemeshard ydb-library-query_actor providers-common-http_gateway + providers-common-proto yql-public-issue api-protos public-lib-operation_id diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index e3f6fd077db..c42ad4a89c9 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -105,16 +105,6 @@ TString EncodeSessionId(ui32 nodeId, const TString& id) { return NOperationId::ProtoToString(opId); } -void ParseGrpcEndpoint(const TString& endpoint, TString& address, bool& useSsl) { - TStringBuf scheme; - TStringBuf host; - TStringBuf uri; - NHttp::CrackURL(endpoint, scheme, host, uri); - - address = ToString(host); - useSsl = scheme == "grpcs"; -} - class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> { struct TEvPrivate { enum EEv { @@ -169,43 +159,30 @@ public: TKqpProxyService(const NKikimrConfig::TLogConfig& logConfig, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, - const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, TVector<NKikimrKqp::TKqpSetting>&& settings, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, - std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources) - : HttpGatewayConfig(CreateHttpGatewayConfig()) - , LogConfig(logConfig) + std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources, + IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory + ): LogConfig(logConfig) , TableServiceConfig(tableServiceConfig) - , TokenAccessorConfig(tokenAccessorConfig) , QueryServiceConfig(queryServiceConfig) , MetadataProviderConfig(metadataProviderConfig) , KqpSettings(std::make_shared<const TKqpSettings>(std::move(settings))) + , FederatedQuerySetupFactory(federatedQuerySetupFactory) , QueryReplayFactory(std::move(queryReplayFactory)) - , HttpGateway(NYql::IHTTPGateway::Make(&HttpGatewayConfig)) // TODO: pass config and counters , PendingRequests() , ModuleResolverState() , KqpProxySharedResources(std::move(kqpProxySharedResources)) {} void Bootstrap(const TActorContext &ctx) { - if (TokenAccessorConfig.GetEnabled()) { - TString caContent; - if (const auto& path = TokenAccessorConfig.GetSslCaCert()) { - caContent = TUnbufferedFileInput(path).ReadAll(); - } - - TString endpointAddress; - bool useSsl = false; - ParseGrpcEndpoint(TokenAccessorConfig.GetEndpoint(), endpointAddress, useSsl); - - CredentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(endpointAddress, useSsl, caContent, TokenAccessorConfig.GetConnectionPoolSize()); - } - NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(KQP_PROVIDER)); Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters, &TlsActivationContext->AsActorContext()); - AsyncIoFactory = CreateKqpAsyncIoFactory(Counters, HttpGateway, CredentialsFactory); + // NOTE: some important actors are constructed within next call + FederatedQuerySetup = FederatedQuerySetupFactory->Make(ctx.ActorSystem()); + AsyncIoFactory = CreateKqpAsyncIoFactory(Counters, FederatedQuerySetup); ModuleResolverState = MakeIntrusive<TModuleResolverState>(); LocalSessions = std::make_unique<TLocalSessionsRegistry>(AppData()->RandomProvider); @@ -241,7 +218,7 @@ public: // Create compile service CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService(TableServiceConfig, MetadataProviderConfig, - KqpSettings, ModuleResolverState, Counters, std::move(QueryReplayFactory), CredentialsFactory, HttpGateway)); + KqpSettings, ModuleResolverState, Counters, std::move(QueryReplayFactory), FederatedQuerySetup)); TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService( MakeKqpCompileServiceID(SelfId().NodeId()), CompileService); @@ -1365,7 +1342,7 @@ private: auto config = CreateConfig(KqpSettings, workerSettings); - IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, HttpGateway, AsyncIoFactory, CredentialsFactory, ModuleResolverState, Counters, MetadataProviderConfig); + IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters, MetadataProviderConfig); auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId); TKqpSessionInfo* sessionInfo = LocalSessions->Create( sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration()); @@ -1545,27 +1522,16 @@ private: } } - static NYql::THttpGatewayConfig CreateHttpGatewayConfig() { - NYql::THttpGatewayConfig config; - config.SetMaxInFlightCount(2000); - config.SetMaxSimulatenousDownloadsSize(2000000000); - config.SetBuffersSizePerStream(5000000); - config.SetConnectionTimeoutSeconds(15); - config.SetRequestTimeoutSeconds(0); - return config; - } - private: - NYql::THttpGatewayConfig HttpGatewayConfig; NKikimrConfig::TLogConfig LogConfig; NKikimrConfig::TTableServiceConfig TableServiceConfig; - NKikimrProto::TTokenAccessorConfig TokenAccessorConfig; NKikimrConfig::TQueryServiceConfig QueryServiceConfig; NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig; TKqpSettings::TConstPtr KqpSettings; - NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; + IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory; + std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup; std::shared_ptr<IQueryReplayBackendFactory> QueryReplayFactory; - NYql::IHTTPGateway::TPtr HttpGateway; + NYql::NConnector::IClient::TPtr ConnectorClient; std::optional<TPeerStats> PeerStats; TKqpProxyRequestTracker PendingRequests; @@ -1614,15 +1580,16 @@ private: IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, - const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, TVector<NKikimrKqp::TKqpSetting>&& settings, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, - std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources) + std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources, + IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory + ) { - return new TKqpProxyService(logConfig, tableServiceConfig, tokenAccessorConfig, queryServiceConfig, metadataProviderConfig, std::move(settings), - std::move(queryReplayFactory),std::move(kqpProxySharedResources)); + return new TKqpProxyService(logConfig, tableServiceConfig, queryServiceConfig, metadataProviderConfig, std::move(settings), + std::move(queryReplayFactory), std::move(kqpProxySharedResources), std::move(federatedQuerySetupFactory)); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.h b/ydb/core/kqp/proxy_service/kqp_proxy_service.h index 648a6010e7d..c070b4df35f 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.h @@ -1,11 +1,13 @@ #pragma once #include <ydb/core/base/appdata.h> +#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h> #include <library/cpp/actors/core/actorid.h> #include <util/datetime/base.h> + namespace NKikimr::NKqp { class IQueryReplayBackendFactory; @@ -49,11 +51,12 @@ TPeerStats CalcPeerStats(const TVector<NKikimrKqp::TKqpProxyNodeResources>& data IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, - const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, TVector<NKikimrKqp::TKqpSetting>&& settings, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, - std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources); + std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources, + IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory + ); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/ya.make b/ydb/core/kqp/proxy_service/ya.make index f7f95f81fa2..cef6cbd6640 100644 --- a/ydb/core/kqp/proxy_service/ya.make +++ b/ydb/core/kqp/proxy_service/ya.make @@ -26,6 +26,7 @@ PEERDIR( ydb/core/tx/schemeshard ydb/library/query_actor ydb/library/yql/providers/common/http_gateway + ydb/library/yql/providers/common/proto ydb/library/yql/public/issue ydb/public/api/protos ydb/public/lib/operation_id diff --git a/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt index ec5779b5b4a..7d60b1ef4fa 100644 --- a/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/session_actor/CMakeLists.darwin-x86_64.txt @@ -16,7 +16,7 @@ target_link_libraries(core-kqp-session_actor PUBLIC yutil ydb-core-docapi core-kqp-common - providers-common-http_gateway + core-kqp-federated_query public-lib-operation_id ) target_sources(core-kqp-session_actor PRIVATE diff --git a/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt index c6febd786ae..cbb38f3df3e 100644 --- a/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/session_actor/CMakeLists.linux-aarch64.txt @@ -17,7 +17,7 @@ target_link_libraries(core-kqp-session_actor PUBLIC yutil ydb-core-docapi core-kqp-common - providers-common-http_gateway + core-kqp-federated_query public-lib-operation_id ) target_sources(core-kqp-session_actor PRIVATE diff --git a/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt index c6febd786ae..cbb38f3df3e 100644 --- a/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/session_actor/CMakeLists.linux-x86_64.txt @@ -17,7 +17,7 @@ target_link_libraries(core-kqp-session_actor PUBLIC yutil ydb-core-docapi core-kqp-common - providers-common-http_gateway + core-kqp-federated_query public-lib-operation_id ) target_sources(core-kqp-session_actor PRIVATE diff --git a/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt index ec5779b5b4a..7d60b1ef4fa 100644 --- a/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/session_actor/CMakeLists.windows-x86_64.txt @@ -16,7 +16,7 @@ target_link_libraries(core-kqp-session_actor PUBLIC yutil ydb-core-docapi core-kqp-common - providers-common-http_gateway + core-kqp-federated_query public-lib-operation_id ) target_sources(core-kqp-session_actor PRIVATE diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index a5060c0ad79..d0705e0132a 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -149,19 +149,18 @@ public: } TKqpSessionActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, - const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway, + const TKqpWorkerSettings& workerSettings, + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig) : Owner(owner) , SessionId(sessionId) , Counters(counters) , Settings(workerSettings) - , HttpGateway(std::move(httpGateway)) , AsyncIoFactory(std::move(asyncIoFactory)) - , CredentialsFactory(std::move(credentialsFactory)) , ModuleResolverState(std::move(moduleResolverState)) + , FederatedQuerySetup(federatedQuerySetup) , KqpSettings(kqpSettings) , Config(CreateConfig(kqpSettings, workerSettings)) , Transactions(*Config->_KqpMaxActiveTxPerSession.Get(), TDuration::Seconds(*Config->_KqpTxIdleTimeoutSec.Get())) @@ -222,7 +221,7 @@ public: void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) { if (!WorkerId) { std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(SelfId(), SessionId, KqpSettings, Settings, - HttpGateway, ModuleResolverState, Counters, CredentialsFactory, MetadataProviderConfig)); + FederatedQuerySetup, ModuleResolverState, Counters, MetadataProviderConfig)); WorkerId = RegisterWithSameMailbox(workerActor.release()); } TlsActivationContext->Send(new IEventHandle(*WorkerId, SelfId(), QueryState->RequestEv.release(), ev->Flags, ev->Cookie, @@ -2131,10 +2130,9 @@ private: TIntrusivePtr<TKqpCounters> Counters; TIntrusivePtr<TKqpRequestCounters> RequestCounters; TKqpWorkerSettings Settings; - NYql::IHTTPGateway::TPtr HttpGateway; NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; - NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; TIntrusivePtr<TModuleResolverState> ModuleResolverState; + std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup; TKqpSettings::TConstPtr KqpSettings; std::optional<TActorId> WorkerId; TActorId ExecuterId; @@ -2159,12 +2157,15 @@ private: IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, - NYql::IHTTPGateway::TPtr httpGateway, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig) { - return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), std::move(asyncIoFactory), std::move(credentialsFactory), std::move(moduleResolverState), counters, metadataProviderConfig); + return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, federatedQuerySetup, + std::move(asyncIoFactory), std::move(moduleResolverState), counters, + metadataProviderConfig + ); } } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h index fd6f9fd381b..548bce979a7 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.h +++ b/ydb/core/kqp/session_actor/kqp_session_actor.h @@ -1,11 +1,10 @@ #pragma once +#include <ydb/core/kqp/common/simple/temp_tables.h> #include <ydb/core/kqp/counters/kqp_counters.h> +#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/protos/config.pb.h> -#include <ydb/core/kqp/common/simple/temp_tables.h> -#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> -#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> #include <library/cpp/actors/core/actorid.h> @@ -35,10 +34,11 @@ struct TKqpWorkerSettings { IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, - NYql::IHTTPGateway::TPtr httpGateway, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, - const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig); + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig + ); IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target); diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index 6cf0b17d17a..c10a443d14a 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -95,17 +95,16 @@ public: } TKqpWorkerActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, - const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway, + const TKqpWorkerSettings& workerSettings, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig) + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig + ) : Owner(owner) , SessionId(sessionId) , Settings(workerSettings) - , HttpGateway(std::move(httpGateway)) + , FederatedQuerySetup(federatedQuerySetup) , ModuleResolverState(moduleResolverState) , Counters(counters) - , CredentialsFactory(std::move(credentialsFactory)) , Config(MakeIntrusive<TKikimrConfiguration>()) , MetadataProviderConfig(metadataProviderConfig) , CreationTime(TInstant::Now()) @@ -143,7 +142,7 @@ public: Config->FeatureFlags = AppData(ctx)->FeatureFlags; KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, - HttpGateway, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false, CredentialsFactory); + FederatedQuerySetup, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false); Become(&TKqpWorkerActor::ReadyState); } @@ -1060,10 +1059,9 @@ private: TActorId Owner; TString SessionId; TKqpWorkerSettings Settings; - NYql::IHTTPGateway::TPtr HttpGateway; + std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup; TIntrusivePtr<TModuleResolverState> ModuleResolverState; TIntrusivePtr<TKqpCounters> Counters; - NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; TIntrusivePtr<TKqpRequestCounters> RequestCounters; TKikimrConfiguration::TPtr Config; NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig; @@ -1080,12 +1078,13 @@ private: IActor* CreateKqpWorkerActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, - NYql::IHTTPGateway::TPtr httpGateway, + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig) + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig + ) { - return new TKqpWorkerActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), moduleResolverState, counters, std::move(credentialsFactory), metadataProviderConfig); + return new TKqpWorkerActor(owner, sessionId, kqpSettings, workerSettings, federatedQuerySetup, + moduleResolverState, counters, metadataProviderConfig); } } // namespace NKqp diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.h b/ydb/core/kqp/session_actor/kqp_worker_common.h index c7254a4fa8a..d0d501ff1b9 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_common.h +++ b/ydb/core/kqp/session_actor/kqp_worker_common.h @@ -8,8 +8,6 @@ #include <ydb/core/kqp/provider/yql_kikimr_provider.h> #include <ydb/core/kqp/provider/yql_kikimr_settings.h> #include <ydb/core/protos/kqp.pb.h> -#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> -#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/log.h> @@ -147,9 +145,11 @@ bool HasSchemeOrFatalIssues(const NYql::TIssues& issues); IActor* CreateKqpWorkerActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, - NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState, - TIntrusivePtr<TKqpCounters> counters, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig); + std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, + TIntrusivePtr<TModuleResolverState> moduleResolverState, + TIntrusivePtr<TKqpCounters> counters, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig + ); bool IsSameProtoType(const NKikimrMiniKQL::TType& actual, const NKikimrMiniKQL::TType& expected); diff --git a/ydb/core/kqp/session_actor/ya.make b/ydb/core/kqp/session_actor/ya.make index 04693ca4fdb..49d8ebfe83b 100644 --- a/ydb/core/kqp/session_actor/ya.make +++ b/ydb/core/kqp/session_actor/ya.make @@ -13,7 +13,7 @@ SRCS( PEERDIR( ydb/core/docapi ydb/core/kqp/common - ydb/library/yql/providers/common/http_gateway + ydb/core/kqp/federated_query ydb/public/lib/operation_id ) diff --git a/ydb/core/kqp/ut/common/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/ut/common/CMakeLists.darwin-x86_64.txt index 9b01d3392d7..8ebd0100ee4 100644 --- a/ydb/core/kqp/ut/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/ut/common/CMakeLists.darwin-x86_64.txt @@ -14,6 +14,7 @@ target_compile_options(kqp-ut-common PRIVATE target_link_libraries(kqp-ut-common PUBLIC contrib-libs-cxxsupp yutil + core-kqp-federated_query ydb-core-testlib yql-public-udf string_udf diff --git a/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt index d54f6db0bf1..091762509e9 100644 --- a/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/ut/common/CMakeLists.linux-aarch64.txt @@ -15,6 +15,7 @@ target_link_libraries(kqp-ut-common PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + core-kqp-federated_query ydb-core-testlib yql-public-udf string_udf diff --git a/ydb/core/kqp/ut/common/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/ut/common/CMakeLists.linux-x86_64.txt index d54f6db0bf1..091762509e9 100644 --- a/ydb/core/kqp/ut/common/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/ut/common/CMakeLists.linux-x86_64.txt @@ -15,6 +15,7 @@ target_link_libraries(kqp-ut-common PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + core-kqp-federated_query ydb-core-testlib yql-public-udf string_udf diff --git a/ydb/core/kqp/ut/common/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/ut/common/CMakeLists.windows-x86_64.txt index 9b01d3392d7..8ebd0100ee4 100644 --- a/ydb/core/kqp/ut/common/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/ut/common/CMakeLists.windows-x86_64.txt @@ -14,6 +14,7 @@ target_compile_options(kqp-ut-common PRIVATE target_link_libraries(kqp-ut-common PUBLIC contrib-libs-cxxsupp yutil + core-kqp-federated_query ydb-core-testlib yql-public-udf string_udf diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index fa2fc041c6e..02d2543f15b 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -131,6 +131,10 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) { if (settings.LogStream) ServerSettings->SetLogBackend(new TStreamLogBackend(settings.LogStream)); + if (settings.FederatedQuerySetupFactory) { + ServerSettings->SetFederatedQuerySetupFactory(settings.FederatedQuerySetupFactory); + } + Server.Reset(MakeHolder<Tests::TServer>(*ServerSettings)); Server->EnableGRpc(grpcPort); Server->SetupDefaultProfiles(); diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index 22e76d202b1..44bd0035f38 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -1,8 +1,9 @@ #pragma once #include <ydb/core/testlib/test_client.h> - +#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/library/yql/core/issue/yql_issue.h> #include <ydb/public/lib/yson_value/ydb_yson_value.h> #include <ydb/public/sdk/cpp/client/ydb_query/client.h> #include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h> @@ -10,9 +11,6 @@ #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <library/cpp/yson/node/node_io.h> - -#include <ydb/library/yql/core/issue/yql_issue.h> - #include <library/cpp/json/json_reader.h> #include <library/cpp/testing/unittest/tests_data.h> #include <library/cpp/testing/unittest/registar.h> @@ -83,6 +81,7 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> { TDuration KeepSnapshotTimeout = TDuration::Zero(); IOutputStream* LogStream = nullptr; TMaybe<NFake::TStorage> Storage = Nothing(); + NKqp::IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory = std::make_shared<NKqp::TKqpFederatedQuerySetupFactoryNoop>(); TKikimrSettings() { @@ -106,6 +105,7 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> { TKikimrSettings& SetKeepSnapshotTimeout(TDuration value) { KeepSnapshotTimeout = value; return *this; } TKikimrSettings& SetLogStream(IOutputStream* follower) { LogStream = follower; return *this; }; TKikimrSettings& SetStorage(const NFake::TStorage& storage) { Storage = storage; return *this; }; + TKikimrSettings& SetFederatedQuerySetupFactory(NKqp::IKqpFederatedQuerySetupFactory::TPtr value) { FederatedQuerySetupFactory = value; return *this; }; }; class TKikimrRunner { diff --git a/ydb/core/kqp/ut/common/ya.make b/ydb/core/kqp/ut/common/ya.make index 34c9747558b..30978c9412b 100644 --- a/ydb/core/kqp/ut/common/ya.make +++ b/ydb/core/kqp/ut/common/ya.make @@ -9,6 +9,7 @@ SRCS( ) PEERDIR( + ydb/core/kqp/federated_query ydb/core/testlib ydb/library/yql/public/udf ydb/library/yql/udfs/common/string diff --git a/ydb/core/kqp/ut/federated_query/CMakeLists.txt b/ydb/core/kqp/ut/federated_query/CMakeLists.txt index f8b31df0c11..f9b0afd798f 100644 --- a/ydb/core/kqp/ut/federated_query/CMakeLists.txt +++ b/ydb/core/kqp/ut/federated_query/CMakeLists.txt @@ -6,12 +6,5 @@ # original buildsystem will not be accepted. -if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-aarch64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") - include(CMakeLists.darwin-x86_64.txt) -elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) - include(CMakeLists.windows-x86_64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-x86_64.txt) -endif() +add_subdirectory(common) +add_subdirectory(s3) diff --git a/ydb/core/kqp/ut/federated_query/common/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/ut/federated_query/common/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..bbe698f755e --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/common/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ut-federated_query-common) +target_compile_options(ut-federated_query-common PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ut-federated_query-common PUBLIC + contrib-libs-cxxsupp + yutil + kqp-ut-common + cpp-client-ydb_operation + cpp-client-ydb_query +) +target_sources(ut-federated_query-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/common/common.cpp +) diff --git a/ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..c228a1c938b --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-aarch64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ut-federated_query-common) +target_compile_options(ut-federated_query-common PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ut-federated_query-common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + kqp-ut-common + cpp-client-ydb_operation + cpp-client-ydb_query +) +target_sources(ut-federated_query-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/common/common.cpp +) diff --git a/ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..c228a1c938b --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/common/CMakeLists.linux-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ut-federated_query-common) +target_compile_options(ut-federated_query-common PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ut-federated_query-common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + kqp-ut-common + cpp-client-ydb_operation + cpp-client-ydb_query +) +target_sources(ut-federated_query-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/common/common.cpp +) diff --git a/ydb/core/kqp/ut/federated_query/common/CMakeLists.txt b/ydb/core/kqp/ut/federated_query/common/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/common/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/kqp/ut/federated_query/common/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/ut/federated_query/common/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..bbe698f755e --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/common/CMakeLists.windows-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ut-federated_query-common) +target_compile_options(ut-federated_query-common PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ut-federated_query-common PUBLIC + contrib-libs-cxxsupp + yutil + kqp-ut-common + cpp-client-ydb_operation + cpp-client-ydb_query +) +target_sources(ut-federated_query-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/common/common.cpp +) diff --git a/ydb/core/kqp/ut/federated_query/common/common.cpp b/ydb/core/kqp/ut/federated_query/common/common.cpp new file mode 100644 index 00000000000..1c8faf78925 --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/common/common.cpp @@ -0,0 +1,44 @@ +#include "common.h" + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NKqp::NFederatedQueryTest { + + NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) { + NYdb::NOperation::TOperationClient client(ydbDriver); + NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op; + do { + if (op.Initialized()) { + Sleep(TDuration::MilliSeconds(10)); + } + op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId); + UNIT_ASSERT_C(op.GetValueSync().Status().IsSuccess(), TStringBuilder() << op.GetValueSync().Status().GetStatus() << ":" << op.GetValueSync().Status().GetIssues().ToString()); + } while (!op.GetValueSync().Ready()); + return op.GetValueSync(); + } + + std::shared_ptr<TKikimrRunner> MakeKikimrRunner( + NYql::IHTTPGateway::TPtr httpGateway, + NYql::NConnector::IClient::TPtr connectorClient, + std::optional<NKikimrConfig::TAppConfig> appConfig) + { + NKikimrConfig::TFeatureFlags featureFlags; + featureFlags.SetEnableExternalDataSources(true); + featureFlags.SetEnableScriptExecutionOperations(true); + + auto federatedQuerySetupFactory = std::make_shared<TKqpFederatedQuerySetupFactoryMock>( + httpGateway, connectorClient, nullptr, nullptr + ); + + auto settings = TKikimrSettings() + .SetFeatureFlags(featureFlags) + .SetFederatedQuerySetupFactory(federatedQuerySetupFactory) + .SetKqpSettings({}) + .SetEnableScriptExecutionOperations(true); + + settings = appConfig ? settings.SetAppConfig(appConfig.value()) : settings.SetAppConfig({}); + + return std::make_shared<TKikimrRunner>(settings); + } + +} diff --git a/ydb/core/kqp/ut/federated_query/common/common.h b/ydb/core/kqp/ut/federated_query/common/common.h new file mode 100644 index 00000000000..e55b2961d2a --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/common/common.h @@ -0,0 +1,19 @@ +#pragma once + +#include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <ydb/public/sdk/cpp/client/ydb_query/query.h> +#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> + +namespace NKikimr::NKqp::NFederatedQueryTest { + using namespace NKikimr::NKqp; + + NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation( + const NYdb::TOperation::TOperationId& operationId, + const NYdb::TDriver& ydbDriver); + + std::shared_ptr<TKikimrRunner> MakeKikimrRunner( + NYql::IHTTPGateway::TPtr httpGateway = nullptr, + NYql::NConnector::IClient::TPtr connectorClient = nullptr, + std::optional<NKikimrConfig::TAppConfig> appConfig = std::nullopt + ); +} diff --git a/ydb/core/kqp/ut/federated_query/common/ya.make b/ydb/core/kqp/ut/federated_query/common/ya.make new file mode 100644 index 00000000000..de8a77b04b8 --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/common/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + common.cpp +) + +PEERDIR( + ydb/core/kqp/ut/common + ydb/public/sdk/cpp/client/ydb_operation + ydb/public/sdk/cpp/client/ydb_query +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/kqp/ut/federated_query/run_testpack b/ydb/core/kqp/ut/federated_query/run_testpack deleted file mode 100755 index 5be30f1a7d7..00000000000 --- a/ydb/core/kqp/ut/federated_query/run_testpack +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env bash -set -e -CTX_DIR=$(mktemp -d) -echo Working dir: $CTX_DIR -cd "$CTX_DIR" - -# Start server -MOTO_SERVER_EXECUTABLE=moto_server $source_root/ydb/tests/tools/s3_recipe/start.sh $CTX_DIR - -# Run test -set +e -S3_ENDPOINT=$(cat $CTX_DIR/env.json | grep 'S3_ENDPOINT' | cut -f 4 -d '"') \ -$build_root/ydb/core/kqp/ut/federated_query/ydb-core-kqp-ut-federated_query $* - -code=$? -if [ $code -gt 0 ];then - echo - echo "Test execution failed" - echo -fi - -# Stop server (removes working dir!) -$source_root/ydb/tests/tools/s3_recipe/stop.sh $CTX_DIR - -# Return result code from test run -if [ $code -gt 0 ];then - exit $code -fi diff --git a/ydb/core/kqp/ut/federated_query/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.darwin-x86_64.txt index 20bf70d4a8e..7498df42dfa 100644 --- a/ydb/core/kqp/ut/federated_query/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.darwin-x86_64.txt @@ -7,14 +7,14 @@ -add_executable(ydb-core-kqp-ut-federated_query) -target_compile_options(ydb-core-kqp-ut-federated_query PRIVATE +add_executable(ydb-core-kqp-ut-federated_query-s3) +target_compile_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE -DUSE_CURRENT_UDF_ABI_VERSION ) -target_include_directories(ydb-core-kqp-ut-federated_query PRIVATE +target_include_directories(ydb-core-kqp-ut-federated_query-s3 PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp ) -target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC +target_link_libraries(ydb-core-kqp-ut-federated_query-s3 PUBLIC contrib-libs-cxxsupp yutil library-cpp-cpuid_check @@ -22,32 +22,32 @@ target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC ydb-core-kqp libs-aws-sdk-cpp-aws-cpp-sdk-s3 kqp-ut-common + ut-federated_query-common yql-sql-pg_dummy - cpp-client-ydb_operation client-ydb_types-operation ) -target_link_options(ydb-core-kqp-ut-federated_query PRIVATE +target_link_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE -Wl,-platform_version,macos,11.0,11.0 -fPIC -fPIC -framework CoreFoundation ) -target_sources(ydb-core-kqp-ut-federated_query PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +target_sources(ydb-core-kqp-ut-federated_query-s3 PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp ) set_property( TARGET - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY SPLIT_FACTOR 1 ) add_yunittest( NAME - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 TEST_TARGET - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 TEST_ARG --print-before-suite --print-before-test @@ -57,26 +57,26 @@ add_yunittest( ) set_yunittest_property( TEST - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY LABELS MEDIUM ) set_yunittest_property( TEST - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY PROCESSORS 1 ) set_yunittest_property( TEST - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY TIMEOUT 600 ) -target_allocator(ydb-core-kqp-ut-federated_query +target_allocator(ydb-core-kqp-ut-federated_query-s3 system_allocator ) -vcs_info(ydb-core-kqp-ut-federated_query) +vcs_info(ydb-core-kqp-ut-federated_query-s3) diff --git a/ydb/core/kqp/ut/federated_query/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.linux-aarch64.txt index 3f8fc55a471..bd342c62e11 100644 --- a/ydb/core/kqp/ut/federated_query/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.linux-aarch64.txt @@ -7,14 +7,14 @@ -add_executable(ydb-core-kqp-ut-federated_query) -target_compile_options(ydb-core-kqp-ut-federated_query PRIVATE +add_executable(ydb-core-kqp-ut-federated_query-s3) +target_compile_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE -DUSE_CURRENT_UDF_ABI_VERSION ) -target_include_directories(ydb-core-kqp-ut-federated_query PRIVATE +target_include_directories(ydb-core-kqp-ut-federated_query-s3 PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp ) -target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC +target_link_libraries(ydb-core-kqp-ut-federated_query-s3 PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil @@ -22,11 +22,11 @@ target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC ydb-core-kqp libs-aws-sdk-cpp-aws-cpp-sdk-s3 kqp-ut-common + ut-federated_query-common yql-sql-pg_dummy - cpp-client-ydb_operation client-ydb_types-operation ) -target_link_options(ydb-core-kqp-ut-federated_query PRIVATE +target_link_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE -ldl -lrt -Wl,--no-as-needed @@ -36,21 +36,21 @@ target_link_options(ydb-core-kqp-ut-federated_query PRIVATE -lrt -ldl ) -target_sources(ydb-core-kqp-ut-federated_query PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +target_sources(ydb-core-kqp-ut-federated_query-s3 PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp ) set_property( TARGET - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY SPLIT_FACTOR 1 ) add_yunittest( NAME - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 TEST_TARGET - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 TEST_ARG --print-before-suite --print-before-test @@ -60,26 +60,26 @@ add_yunittest( ) set_yunittest_property( TEST - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY LABELS MEDIUM ) set_yunittest_property( TEST - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY PROCESSORS 1 ) set_yunittest_property( TEST - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY TIMEOUT 600 ) -target_allocator(ydb-core-kqp-ut-federated_query +target_allocator(ydb-core-kqp-ut-federated_query-s3 cpp-malloc-jemalloc ) -vcs_info(ydb-core-kqp-ut-federated_query) +vcs_info(ydb-core-kqp-ut-federated_query-s3) diff --git a/ydb/core/kqp/ut/federated_query/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.linux-x86_64.txt index d494f203529..5d3f80e8e31 100644 --- a/ydb/core/kqp/ut/federated_query/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.linux-x86_64.txt @@ -7,14 +7,14 @@ -add_executable(ydb-core-kqp-ut-federated_query) -target_compile_options(ydb-core-kqp-ut-federated_query PRIVATE +add_executable(ydb-core-kqp-ut-federated_query-s3) +target_compile_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE -DUSE_CURRENT_UDF_ABI_VERSION ) -target_include_directories(ydb-core-kqp-ut-federated_query PRIVATE +target_include_directories(ydb-core-kqp-ut-federated_query-s3 PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp ) -target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC +target_link_libraries(ydb-core-kqp-ut-federated_query-s3 PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil @@ -23,11 +23,11 @@ target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC ydb-core-kqp libs-aws-sdk-cpp-aws-cpp-sdk-s3 kqp-ut-common + ut-federated_query-common yql-sql-pg_dummy - cpp-client-ydb_operation client-ydb_types-operation ) -target_link_options(ydb-core-kqp-ut-federated_query PRIVATE +target_link_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE -ldl -lrt -Wl,--no-as-needed @@ -37,21 +37,21 @@ target_link_options(ydb-core-kqp-ut-federated_query PRIVATE -lrt -ldl ) -target_sources(ydb-core-kqp-ut-federated_query PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +target_sources(ydb-core-kqp-ut-federated_query-s3 PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp ) set_property( TARGET - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY SPLIT_FACTOR 1 ) add_yunittest( NAME - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 TEST_TARGET - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 TEST_ARG --print-before-suite --print-before-test @@ -61,27 +61,27 @@ add_yunittest( ) set_yunittest_property( TEST - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY LABELS MEDIUM ) set_yunittest_property( TEST - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY PROCESSORS 1 ) set_yunittest_property( TEST - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY TIMEOUT 600 ) -target_allocator(ydb-core-kqp-ut-federated_query +target_allocator(ydb-core-kqp-ut-federated_query-s3 cpp-malloc-tcmalloc libs-tcmalloc-no_percpu_cache ) -vcs_info(ydb-core-kqp-ut-federated_query) +vcs_info(ydb-core-kqp-ut-federated_query-s3) diff --git a/ydb/core/kqp/ut/federated_query/s3/CMakeLists.txt b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/kqp/ut/federated_query/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.windows-x86_64.txt index 04e259e6c76..e8ca79a81a1 100644 --- a/ydb/core/kqp/ut/federated_query/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/ut/federated_query/s3/CMakeLists.windows-x86_64.txt @@ -7,14 +7,14 @@ -add_executable(ydb-core-kqp-ut-federated_query) -target_compile_options(ydb-core-kqp-ut-federated_query PRIVATE +add_executable(ydb-core-kqp-ut-federated_query-s3) +target_compile_options(ydb-core-kqp-ut-federated_query-s3 PRIVATE -DUSE_CURRENT_UDF_ABI_VERSION ) -target_include_directories(ydb-core-kqp-ut-federated_query PRIVATE +target_include_directories(ydb-core-kqp-ut-federated_query-s3 PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp ) -target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC +target_link_libraries(ydb-core-kqp-ut-federated_query-s3 PUBLIC contrib-libs-cxxsupp yutil library-cpp-cpuid_check @@ -22,25 +22,25 @@ target_link_libraries(ydb-core-kqp-ut-federated_query PUBLIC ydb-core-kqp libs-aws-sdk-cpp-aws-cpp-sdk-s3 kqp-ut-common + ut-federated_query-common yql-sql-pg_dummy - cpp-client-ydb_operation client-ydb_types-operation ) -target_sources(ydb-core-kqp-ut-federated_query PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +target_sources(ydb-core-kqp-ut-federated_query-s3 PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp ) set_property( TARGET - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY SPLIT_FACTOR 1 ) add_yunittest( NAME - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 TEST_TARGET - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 TEST_ARG --print-before-suite --print-before-test @@ -50,26 +50,26 @@ add_yunittest( ) set_yunittest_property( TEST - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY LABELS MEDIUM ) set_yunittest_property( TEST - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY PROCESSORS 1 ) set_yunittest_property( TEST - ydb-core-kqp-ut-federated_query + ydb-core-kqp-ut-federated_query-s3 PROPERTY TIMEOUT 600 ) -target_allocator(ydb-core-kqp-ut-federated_query +target_allocator(ydb-core-kqp-ut-federated_query-s3 system_allocator ) -vcs_info(ydb-core-kqp-ut-federated_query) +vcs_info(ydb-core-kqp-ut-federated_query-s3) diff --git a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp index 93e644cba81..aff9a1a45f7 100644 --- a/ydb/core/kqp/ut/federated_query/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp @@ -1,5 +1,3 @@ -#include <ydb/core/kqp/ut/common/kqp_ut_common.h> - #include <util/system/env.h> #include <aws/core/auth/AWSCredentialsProvider.h> @@ -10,20 +8,24 @@ #include <aws/s3/model/PutObjectRequest.h> #include <aws/s3/S3Client.h> +#include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <ydb/core/kqp/ut/federated_query/common/common.h> +#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <fmt/format.h> -#include <ydb/public/sdk/cpp/client/ydb_table/table.h> namespace NKikimr { namespace NKqp { using namespace NYdb; using namespace NYdb::NQuery; +using namespace NKikimr::NKqp::NFederatedQueryTest; constexpr TStringBuf TEST_CONTENT = R"({"key": "1", "value": "trololo"} @@ -164,18 +166,6 @@ TString GetBucketLocation(const TStringBuf bucket) { return TStringBuilder() << GetEnv("S3_ENDPOINT") << '/' << bucket << '/'; } -NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) { - NYdb::NOperation::TOperationClient client(ydbDriver); - NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op; - do { - if (op.Initialized()) { - Sleep(TDuration::MilliSeconds(10)); - } - op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId); - UNIT_ASSERT_C(op.GetValueSync().Status().IsSuccess(), op.GetValueSync().Status().GetStatus() << ":" << op.GetValueSync().Status().GetIssues().ToString()); - } while (!op.GetValueSync().Ready()); - return op.GetValueSync(); -} Y_UNIT_TEST_SUITE(KqpFederatedQuery) { Y_UNIT_TEST(ExecuteScriptWithExternalTableResolve) { @@ -187,10 +177,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { CreateBucketWithObject(bucket, object, TEST_CONTENT); - auto kikimr = DefaultKikimrRunner(); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto tc = kikimr.GetTableClient(); + auto tc = kikimr->GetTableClient(); auto session = tc.CreateSession().GetValueSync().GetSession(); const TString query = fmt::format(R"( CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( @@ -219,12 +208,12 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { SELECT * FROM `{external_table}` )", "external_table"_a=externalTableName); - auto db = kikimr.GetQueryClient(); + auto db = kikimr->GetQueryClient(); auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); @@ -251,10 +240,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { CreateBucketWithObject(bucket, object, TEST_CONTENT); - auto kikimr = DefaultKikimrRunner(); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto tc = kikimr.GetTableClient(); + auto tc = kikimr->GetTableClient(); auto session = tc.CreateSession().GetValueSync().GetSession(); const TString query = fmt::format(R"( CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( @@ -282,7 +270,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { SELECT * FROM `{external_table}` )", "external_table"_a=externalTableName); - auto db = kikimr.GetQueryClient(); + auto db = kikimr->GetQueryClient(); auto executeQueryIterator = db.StreamExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); size_t currentRow = 0; @@ -323,10 +311,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { CreateBucketWithObject(bucket, object, TEST_CONTENT); - auto kikimr = DefaultKikimrRunner(); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto tc = kikimr.GetTableClient(); + auto tc = kikimr->GetTableClient(); auto session = tc.CreateSession().GetValueSync().GetSession(); const TString query = fmt::format(R"( CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( @@ -356,17 +343,17 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { SELECT * FROM `{external_table}` )", "external_table"_a=externalTableName); - auto db = kikimr.GetQueryClient(); + auto db = kikimr->GetQueryClient(); auto scriptExecutionOperation = db.ExecuteScript(sql, settings).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false); scriptExecutionOperation = db.ExecuteScript(sql, settings).ExtractValueSync(); - readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false); } @@ -378,9 +365,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); - auto kikimr = DefaultKikimrRunner(); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); - auto tc = kikimr.GetTableClient(); + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + auto tc = kikimr->GetTableClient(); auto session = tc.CreateSession().GetValueSync().GetSession(); const TString query = fmt::format(R"( CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( @@ -393,7 +379,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { ); auto result = session.ExecuteSchemeQuery(query).GetValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - auto db = kikimr.GetQueryClient(); + auto db = kikimr->GetQueryClient(); auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( SELECT * FROM `{external_source}`.`/` WITH ( format="json_each_row", @@ -406,7 +392,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); @@ -430,9 +416,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); - auto kikimr = DefaultKikimrRunner(); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); - auto tc = kikimr.GetTableClient(); + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + auto tc = kikimr->GetTableClient(); auto session = tc.CreateSession().GetValueSync().GetSession(); { const TString query = fmt::format(R"( @@ -465,7 +450,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); } - auto db = kikimr.GetQueryClient(); + auto db = kikimr->GetQueryClient(); auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( SELECT t1.key as key, t1.value as v1, t2.value as v2 FROM `{external_source}`.`/` WITH ( format="json_each_row", @@ -480,7 +465,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); @@ -507,10 +492,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { CreateBucketWithObject(bucket, object, TEST_CONTENT); - auto kikimr = DefaultKikimrRunner(); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto tc = kikimr.GetTableClient(); + auto tc = kikimr->GetTableClient(); auto session = tc.CreateSession().GetValueSync().GetSession(); const TString query = fmt::format(R"( CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( @@ -534,18 +518,18 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { auto result = session.ExecuteSchemeQuery(query).GetValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - auto db = kikimr.GetQueryClient(); + auto db = kikimr->GetQueryClient(); auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( PRAGMA s3.JsonListSizeLimit = "10"; PRAGMA s3.SourceCoroActor = 'true'; - PRAGMA Kikimr.OptEnableOlapPushdown = "false"; + PRAGMA kikimr.OptEnableOlapPushdown = "false"; SELECT * FROM `{external_table}` )", "external_table"_a=externalTableName)).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_C(readyOp.Metadata().ExecStatus == EExecStatus::Completed, readyOp.Status().GetIssues().ToString()); TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); @@ -570,9 +554,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); - auto kikimr = DefaultKikimrRunner(); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); - auto tc = kikimr.GetTableClient(); + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + auto tc = kikimr->GetTableClient(); auto session = tc.CreateSession().GetValueSync().GetSession(); { const TString query = fmt::format(R"( @@ -605,11 +588,11 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); } - auto db = kikimr.GetQueryClient(); + auto db = kikimr->GetQueryClient(); auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( PRAGMA s3.JsonListSizeLimit = "10"; PRAGMA s3.SourceCoroActor = 'true'; - PRAGMA Kikimr.OptEnableOlapPushdown = "false"; + PRAGMA kikimr.OptEnableOlapPushdown = "false"; SELECT t1.key as key, t1.value as v1, t2.value as v2 FROM `{external_source}`.`/` WITH ( format="json_each_row", schema( @@ -623,8 +606,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); - UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_C(readyOp.Metadata().ExecStatus == EExecStatus::Completed, readyOp.Status().GetIssues().ToString()); TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); @@ -648,9 +631,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { CreateBucketWithObject(bucket, "test_object", TEST_CONTENT); - auto kikimr = DefaultKikimrRunner(); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); - auto tc = kikimr.GetTableClient(); + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + auto tc = kikimr->GetTableClient(); auto session = tc.CreateSession().GetValueSync().GetSession(); const TString query = fmt::format(R"( CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( @@ -663,7 +645,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { ); auto result = session.ExecuteSchemeQuery(query).GetValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - auto db = kikimr.GetQueryClient(); + auto db = kikimr->GetQueryClient(); auto scriptExecutionOperation = db.ExecuteScript(fmt::format(R"( SELECT * FROM `{external_source}`.`*` WITH ( format="json_each_row", @@ -676,7 +658,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); @@ -701,20 +683,12 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { CreateBucketWithObject(bucket, object, TEST_CONTENT); - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetBindingsMode(mode); - - NKikimrConfig::TFeatureFlags featureFlags; - featureFlags.SetEnableExternalDataSources(true); - featureFlags.SetEnableScriptExecutionOperations(true); - - auto settings = TKikimrSettings() - .SetAppConfig(appConfig) - .SetFeatureFlags(featureFlags); + auto appConfig = std::make_optional<NKikimrConfig::TAppConfig>(); + appConfig->MutableTableServiceConfig()->SetBindingsMode(mode); - TKikimrRunner kikimr(settings); + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make(), nullptr, appConfig); - auto tc = kikimr.GetTableClient(); + auto tc = kikimr->GetTableClient(); auto session = tc.CreateSession().GetValueSync().GetSession(); const TString query = fmt::format(R"( CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( @@ -742,12 +716,12 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { SELECT * FROM bindings.`{external_table}` )", "external_table"_a=externalTableName); - auto db = kikimr.GetQueryClient(); + auto db = kikimr->GetQueryClient(); auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); TFetchScriptResultsResult results(TStatus(EStatus::SUCCESS, {})); if (readyOp.Metadata().ExecStatus == EExecStatus::Completed) { results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); @@ -824,10 +798,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { CreateBucket(writeBucket, s3Client); } - auto kikimr = DefaultKikimrRunner(); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto tc = kikimr.GetTableClient(); + auto tc = kikimr->GetTableClient(); auto session = tc.CreateSession().GetValueSync().GetSession(); const TString query = fmt::format(R"( CREATE EXTERNAL DATA SOURCE `{read_source}` WITH ( @@ -877,7 +850,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { "read_table"_a=readTableName, "write_table"_a = writeTableName); - auto db = kikimr.GetQueryClient(); + auto db = kikimr->GetQueryClient(); auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); resultFuture.Wait(); UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); @@ -917,10 +890,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { CreateBucket(writeBucket, s3Client); } - auto kikimr = DefaultKikimrRunner(); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto tc = kikimr.GetTableClient(); + auto tc = kikimr->GetTableClient(); auto session = tc.CreateSession().GetValueSync().GetSession(); { const TString query = fmt::format(R"( @@ -963,7 +935,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()).GetValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); } - auto db = kikimr.GetQueryClient(); + auto db = kikimr->GetQueryClient(); ExecuteInsertQuery(db, writeTableName, readTableName, false); ExecuteInsertQuery(db, writeTableName, readTableName, true); { @@ -1005,10 +977,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UploadObject(bucket, keysObject, TEST_CONTENT_KEYS, s3Client); } - auto kikimr = DefaultKikimrRunner(); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto tc = kikimr.GetTableClient(); + auto tc = kikimr->GetTableClient(); auto session = tc.CreateSession().GetValueSync().GetSession(); const TString query = fmt::format(R"( CREATE EXTERNAL DATA SOURCE `{data_source}` WITH ( @@ -1053,7 +1024,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { "data_table"_a = dataTable, "keys_table"_a = keysTable); - auto db = kikimr.GetQueryClient(); + auto db = kikimr->GetQueryClient(); auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()); resultFuture.Wait(); UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString()); @@ -1075,10 +1046,9 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { CreateBucketWithObject(bucket, object, content); - auto kikimr = DefaultKikimrRunner(); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true); + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); - auto tc = kikimr.GetTableClient(); + auto tc = kikimr->GetTableClient(); auto session = tc.CreateSession().GetValueSync().GetSession(); const TString query = fmt::format(R"( CREATE EXTERNAL DATA SOURCE `{external_source}` WITH ( @@ -1104,7 +1074,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { auto result = session.ExecuteSchemeQuery(query).GetValueSync(); UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); - auto db = kikimr.GetQueryClient(); + auto db = kikimr->GetQueryClient(); const TString sql = fmt::format(R"( SELECT * FROM `{external_table}` )", "external_table"_a = externalTableName); @@ -1112,7 +1082,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); - NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); diff --git a/ydb/core/kqp/ut/federated_query/s3/ya.make b/ydb/core/kqp/ut/federated_query/s3/ya.make new file mode 100644 index 00000000000..e043ec191fb --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/s3/ya.make @@ -0,0 +1,28 @@ +UNITTEST_FOR(ydb/core/kqp) + +IF (WITH_VALGRIND) + TIMEOUT(3600) + SIZE(LARGE) + TAG(ya:fat) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +SRCS( + kqp_federated_query_ut.cpp +) + +PEERDIR( + contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3 + ydb/core/kqp/ut/common + ydb/core/kqp/ut/federated_query/common + ydb/library/yql/sql/pg_dummy + ydb/public/sdk/cpp/client/ydb_types/operation +) + +YQL_LAST_ABI_VERSION() + +INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/s3_recipe/recipe.inc) + +END() diff --git a/ydb/core/kqp/ut/federated_query/ya.make b/ydb/core/kqp/ut/federated_query/ya.make index dd07aecaf6b..abe2eb1010f 100644 --- a/ydb/core/kqp/ut/federated_query/ya.make +++ b/ydb/core/kqp/ut/federated_query/ya.make @@ -1,28 +1,7 @@ -UNITTEST_FOR(ydb/core/kqp) - -IF (WITH_VALGRIND) - TIMEOUT(3600) - SIZE(LARGE) - TAG(ya:fat) -ELSE() - TIMEOUT(600) - SIZE(MEDIUM) -ENDIF() - -SRCS( - kqp_federated_query_ut.cpp +RECURSE_FOR_TESTS( + s3 ) -PEERDIR( - contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3 - ydb/core/kqp/ut/common - ydb/library/yql/sql/pg_dummy - ydb/public/sdk/cpp/client/ydb_operation - ydb/public/sdk/cpp/client/ydb_types/operation +RECURSE( + common ) - -YQL_LAST_ABI_VERSION() - -INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/s3_recipe/recipe.inc) - -END() diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 16d0ca327b0..6ebd3e68b46 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -52,8 +52,9 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga UNIT_ASSERT(TryParseFromTextFormat(defaultSettingsStream, defaultSettings)); kikimrConfig->Init(defaultSettings.GetDefaultSettings(), cluster, settings, true); - return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver, NYql::IHTTPGateway::Make(), funcRegistry, - keepConfigChanges); + auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr}); + return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver, + federatedQuerySetup, funcRegistry, funcRegistry, keepConfigChanges); } NYql::NNodes::TExprBase GetExpr(const TString& ast, NYql::TExprContext& ctx, NYql::IModuleResolver* moduleResolver) { diff --git a/ydb/core/kqp/ya.make b/ydb/core/kqp/ya.make index e34ec78a290..0c70ffb04c7 100644 --- a/ydb/core/kqp/ya.make +++ b/ydb/core/kqp/ya.make @@ -61,6 +61,7 @@ RECURSE( counters executer_actor expr_nodes + federated_query gateway host node_service diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index ec4634efd6d..30fb7e37a87 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1391,6 +1391,8 @@ message TQueryServiceConfig { optional NYql.TS3GatewayConfig S3 = 6; optional NYql.THttpGatewayConfig HttpGateway = 7; optional NYql.TGenericConnectorConfig Connector = 8; + optional string MdbGateway = 9; + optional bool MdbTransformHost = 10; } // Config describes immediate controls and allows diff --git a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt index c536df6d5da..196f5b664a1 100644 --- a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt @@ -47,6 +47,7 @@ target_link_libraries(ydb-core-testlib PUBLIC core-kesus-tablet ydb-core-keyvalue ydb-core-kqp + core-kqp-federated_query ydb-core-metering ydb-core-mind core-mind-address_classification diff --git a/ydb/core/testlib/CMakeLists.linux-aarch64.txt b/ydb/core/testlib/CMakeLists.linux-aarch64.txt index 9ed91887756..80814daf462 100644 --- a/ydb/core/testlib/CMakeLists.linux-aarch64.txt +++ b/ydb/core/testlib/CMakeLists.linux-aarch64.txt @@ -48,6 +48,7 @@ target_link_libraries(ydb-core-testlib PUBLIC core-kesus-tablet ydb-core-keyvalue ydb-core-kqp + core-kqp-federated_query ydb-core-metering ydb-core-mind core-mind-address_classification diff --git a/ydb/core/testlib/CMakeLists.linux-x86_64.txt b/ydb/core/testlib/CMakeLists.linux-x86_64.txt index 9ed91887756..80814daf462 100644 --- a/ydb/core/testlib/CMakeLists.linux-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.linux-x86_64.txt @@ -48,6 +48,7 @@ target_link_libraries(ydb-core-testlib PUBLIC core-kesus-tablet ydb-core-keyvalue ydb-core-kqp + core-kqp-federated_query ydb-core-metering ydb-core-mind core-mind-address_classification diff --git a/ydb/core/testlib/CMakeLists.windows-x86_64.txt b/ydb/core/testlib/CMakeLists.windows-x86_64.txt index c536df6d5da..196f5b664a1 100644 --- a/ydb/core/testlib/CMakeLists.windows-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.windows-x86_64.txt @@ -47,6 +47,7 @@ target_link_libraries(ydb-core-testlib PUBLIC core-kesus-tablet ydb-core-keyvalue ydb-core-kqp + core-kqp-federated_query ydb-core-metering ydb-core-mind core-mind-address_classification diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index da0510df57f..3385a8246d9 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -819,11 +819,11 @@ namespace Tests { IActor* kqpProxyService = NKqp::CreateKqpProxyService(Settings->AppConfig.GetLogConfig(), Settings->AppConfig.GetTableServiceConfig(), - Settings->AppConfig.GetAuthConfig().GetTokenAccessorConfig(), Settings->AppConfig.GetQueryServiceConfig(), Settings->AppConfig.GetMetadataProviderConfig(), TVector<NKikimrKqp::TKqpSetting>(Settings->KqpSettings), - nullptr, std::move(kqpProxySharedResources)); + nullptr, std::move(kqpProxySharedResources), + Settings->FederatedQuerySetupFactory); TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx); Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx); } diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index fe9e3f42b9b..d461d403d8a 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -22,6 +22,7 @@ #include <ydb/core/testlib/basics/appdata.h> #include <ydb/core/protos/kesus.pb.h> #include <ydb/core/kesus/tablet/events.h> +#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h> #include <ydb/core/security/ticket_parser.h> #include <ydb/core/base/grpc_service_factory.h> #include <ydb/core/persqueue/actor_persqueue_client_iface.h> @@ -137,6 +138,7 @@ namespace Tests { bool EnableMetering = false; TString MeteringFilePath; TString AwsRegion; + NKqp::IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory = std::make_shared<NKqp::TKqpFederatedQuerySetupFactoryNoop>(); std::function<IActor*(const NKikimrProto::TAuthConfig&)> CreateTicketParser = NKikimr::CreateTicketParser; std::shared_ptr<TGrpcServiceFactory> GrpcServiceFactory; @@ -179,6 +181,7 @@ namespace Tests { TServerSettings& SetChangesQueueBytesLimit(ui64 value) { ChangesQueueBytesLimit = value; return *this; } TServerSettings& SetMeteringFilePath(const TString& path) { EnableMetering = true; MeteringFilePath = path; return *this; } TServerSettings& SetAwsRegion(const TString& value) { AwsRegion = value; return *this; } + TServerSettings& SetFederatedQuerySetupFactory(NKqp::IKqpFederatedQuerySetupFactory::TPtr value) { FederatedQuerySetupFactory = value; return *this; } TServerSettings& SetPersQueueGetReadSessionsInfoWorkerFactory( std::shared_ptr<NKikimr::NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> factory ) { diff --git a/ydb/core/testlib/ya.make b/ydb/core/testlib/ya.make index 8f94d82c799..af028b07f5b 100644 --- a/ydb/core/testlib/ya.make +++ b/ydb/core/testlib/ya.make @@ -51,6 +51,7 @@ PEERDIR( ydb/core/kesus/tablet ydb/core/keyvalue ydb/core/kqp + ydb/core/kqp/federated_query ydb/core/metering ydb/core/mind ydb/core/mind/address_classification diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp index 772d1c86b7e..ddf67987c24 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp @@ -50,7 +50,7 @@ public: Server = new TServer(serverSettings); Runtime = Server->GetRuntime(); - Runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NLog::PRI_DEBUG); + Runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_DEBUG); TDispatchOptions rmReady; rmReady.CustomFinalCondition = [this] { @@ -65,7 +65,7 @@ public: }; Runtime->DispatchEvents(rmReady); - Runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NLog::PRI_NOTICE); + Runtime->SetLogPriority(NKikimrServices::KQP_RESOURCE_MANAGER, NActors::NLog::PRI_NOTICE); // Runtime->SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); // Runtime->SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); Runtime->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_TRACE); diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h index 4a58980d9d0..aaf2750bad1 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h +++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h @@ -88,6 +88,8 @@ struct TDatabaseResolverResponse { class IDatabaseAsyncResolver { public: + using TPtr = std::shared_ptr<IDatabaseAsyncResolver>; + using TDatabaseAuthMap = THashMap<std::pair<TString, EDatabaseType>, NYql::TDatabaseAuth>; virtual NThreading::TFuture<NYql::TDatabaseResolverResponse> ResolveIds(const TDatabaseAuthMap& ids) const = 0; |