diff options
author | grigoriypisar <[email protected]> | 2023-10-11 21:57:49 +0300 |
---|---|---|
committer | grigoriypisar <[email protected]> | 2023-10-11 22:13:04 +0300 |
commit | 0c1480f04a0176a3ab69027cc37e563a3438e80b (patch) | |
tree | 4b08cd30b931d746e7929cec66579b5f30941b4b | |
parent | 3f893ad4c0dfd90a7b02431d5d00752ba48923c9 (diff) |
Added connector settings
Added local connector
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 44 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.h | 2 |
2 files changed, 45 insertions, 1 deletions
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 0e1bbe3a53f..7712a714376 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -33,8 +33,11 @@ #include <ydb/services/persqueue_v1/topic.h> #include <ydb/services/persqueue_v1/grpc_pq_write.h> #include <ydb/services/monitoring/grpc_service.h> +#include <ydb/core/fq/libs/actors/database_resolver.h> #include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h> #include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h> +#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h> +#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h> #include <ydb/core/blobstorage/base/blobstorage_events.h> #include <ydb/core/client/metadata/types_metadata.h> #include <ydb/core/client/metadata/functions_metadata.h> @@ -110,6 +113,7 @@ #include <ydb/core/client/server/msgbus_server_tracer.h> #include <ydb/core/client/server/ic_nodes_cache_service.h> +#include <library/cpp/actors/http/http_proxy.h> #include <library/cpp/actors/interconnect/interconnect.h> #include <library/cpp/grpc/server/actors/logger.h> @@ -836,13 +840,51 @@ namespace Tests { new NYql::NLog::TTlsLogBackend(new TNullLogBackend())); } + std::shared_ptr<NKikimr::NKqp::IKqpFederatedQuerySetupFactory> federatedQuerySetupFactory = Settings->FederatedQuerySetupFactory; + if (Settings->InitializeFederatedQuerySetupFactory) { + const auto& queryServiceConfig = Settings->AppConfig.GetQueryServiceConfig(); + + auto httpProxyActorId = NFq::MakeYqlAnalyticsHttpProxyId(); + Runtime->RegisterService( + httpProxyActorId, + Runtime->Register(NHttp::CreateHttpProxy(), nodeIdx), + nodeIdx + ); + + auto databaseResolverActorId = NFq::MakeDatabaseResolverActorId(); + Runtime->RegisterService( + databaseResolverActorId, + Runtime->Register(NFq::CreateDatabaseResolver(httpProxyActorId, nullptr), nodeIdx), + nodeIdx + ); + + std::shared_ptr<NFq::TDatabaseAsyncResolverImpl> databaseAsyncResolver; + if (queryServiceConfig.HasMdbGateway() && queryServiceConfig.HasMdbTransformHost()) { + databaseAsyncResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>( + Runtime->GetActorSystem(nodeIdx), + databaseResolverActorId, + "", + queryServiceConfig.GetMdbGateway(), + NFq::MakeMdbEndpointGeneratorGeneric(queryServiceConfig.GetMdbTransformHost()) + ); + } + + federatedQuerySetupFactory = std::make_shared<NKikimr::NKqp::TKqpFederatedQuerySetupFactoryMock>( + NYql::IHTTPGateway::Make(&queryServiceConfig.GetHttpGateway()), + NYql::NConnector::MakeClientGRPC(queryServiceConfig.GetConnector()), + nullptr, + databaseAsyncResolver, + queryServiceConfig.GetS3() + ); + } + IActor* kqpProxyService = NKqp::CreateKqpProxyService(Settings->AppConfig.GetLogConfig(), Settings->AppConfig.GetTableServiceConfig(), Settings->AppConfig.GetQueryServiceConfig(), Settings->AppConfig.GetMetadataProviderConfig(), TVector<NKikimrKqp::TKqpSetting>(Settings->KqpSettings), nullptr, std::move(kqpProxySharedResources), - Settings->FederatedQuerySetupFactory); + federatedQuerySetupFactory); TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx); Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx); } diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index b6a05b5d61d..5b44a07b8fd 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -139,6 +139,7 @@ namespace Tests { TString MeteringFilePath; TString AwsRegion; NKqp::IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory = std::make_shared<NKqp::TKqpFederatedQuerySetupFactoryNoop>(); + bool InitializeFederatedQuerySetupFactory = false; std::function<IActor*(const NKikimrProto::TAuthConfig&)> CreateTicketParser = NKikimr::CreateTicketParser; std::shared_ptr<TGrpcServiceFactory> GrpcServiceFactory; @@ -182,6 +183,7 @@ namespace Tests { TServerSettings& SetMeteringFilePath(const TString& path) { EnableMetering = true; MeteringFilePath = path; return *this; } TServerSettings& SetAwsRegion(const TString& value) { AwsRegion = value; return *this; } TServerSettings& SetFederatedQuerySetupFactory(NKqp::IKqpFederatedQuerySetupFactory::TPtr value) { FederatedQuerySetupFactory = value; return *this; } + TServerSettings& SetInitializeFederatedQuerySetupFactory(bool value) { InitializeFederatedQuerySetupFactory = value; return *this; } TServerSettings& SetPersQueueGetReadSessionsInfoWorkerFactory( std::shared_ptr<NKikimr::NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> factory ) { |