summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <[email protected]>2023-10-11 21:57:49 +0300
committergrigoriypisar <[email protected]>2023-10-11 22:13:04 +0300
commit0c1480f04a0176a3ab69027cc37e563a3438e80b (patch)
tree4b08cd30b931d746e7929cec66579b5f30941b4b
parent3f893ad4c0dfd90a7b02431d5d00752ba48923c9 (diff)
Added connector settings
Added local connector
-rw-r--r--ydb/core/testlib/test_client.cpp44
-rw-r--r--ydb/core/testlib/test_client.h2
2 files changed, 45 insertions, 1 deletions
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 0e1bbe3a53f..7712a714376 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -33,8 +33,11 @@
#include <ydb/services/persqueue_v1/topic.h>
#include <ydb/services/persqueue_v1/grpc_pq_write.h>
#include <ydb/services/monitoring/grpc_service.h>
+#include <ydb/core/fq/libs/actors/database_resolver.h>
#include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h>
#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h>
+#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
+#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
#include <ydb/core/blobstorage/base/blobstorage_events.h>
#include <ydb/core/client/metadata/types_metadata.h>
#include <ydb/core/client/metadata/functions_metadata.h>
@@ -110,6 +113,7 @@
#include <ydb/core/client/server/msgbus_server_tracer.h>
#include <ydb/core/client/server/ic_nodes_cache_service.h>
+#include <library/cpp/actors/http/http_proxy.h>
#include <library/cpp/actors/interconnect/interconnect.h>
#include <library/cpp/grpc/server/actors/logger.h>
@@ -836,13 +840,51 @@ namespace Tests {
new NYql::NLog::TTlsLogBackend(new TNullLogBackend()));
}
+ std::shared_ptr<NKikimr::NKqp::IKqpFederatedQuerySetupFactory> federatedQuerySetupFactory = Settings->FederatedQuerySetupFactory;
+ if (Settings->InitializeFederatedQuerySetupFactory) {
+ const auto& queryServiceConfig = Settings->AppConfig.GetQueryServiceConfig();
+
+ auto httpProxyActorId = NFq::MakeYqlAnalyticsHttpProxyId();
+ Runtime->RegisterService(
+ httpProxyActorId,
+ Runtime->Register(NHttp::CreateHttpProxy(), nodeIdx),
+ nodeIdx
+ );
+
+ auto databaseResolverActorId = NFq::MakeDatabaseResolverActorId();
+ Runtime->RegisterService(
+ databaseResolverActorId,
+ Runtime->Register(NFq::CreateDatabaseResolver(httpProxyActorId, nullptr), nodeIdx),
+ nodeIdx
+ );
+
+ std::shared_ptr<NFq::TDatabaseAsyncResolverImpl> databaseAsyncResolver;
+ if (queryServiceConfig.HasMdbGateway() && queryServiceConfig.HasMdbTransformHost()) {
+ databaseAsyncResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>(
+ Runtime->GetActorSystem(nodeIdx),
+ databaseResolverActorId,
+ "",
+ queryServiceConfig.GetMdbGateway(),
+ NFq::MakeMdbEndpointGeneratorGeneric(queryServiceConfig.GetMdbTransformHost())
+ );
+ }
+
+ federatedQuerySetupFactory = std::make_shared<NKikimr::NKqp::TKqpFederatedQuerySetupFactoryMock>(
+ NYql::IHTTPGateway::Make(&queryServiceConfig.GetHttpGateway()),
+ NYql::NConnector::MakeClientGRPC(queryServiceConfig.GetConnector()),
+ nullptr,
+ databaseAsyncResolver,
+ queryServiceConfig.GetS3()
+ );
+ }
+
IActor* kqpProxyService = NKqp::CreateKqpProxyService(Settings->AppConfig.GetLogConfig(),
Settings->AppConfig.GetTableServiceConfig(),
Settings->AppConfig.GetQueryServiceConfig(),
Settings->AppConfig.GetMetadataProviderConfig(),
TVector<NKikimrKqp::TKqpSetting>(Settings->KqpSettings),
nullptr, std::move(kqpProxySharedResources),
- Settings->FederatedQuerySetupFactory);
+ federatedQuerySetupFactory);
TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx);
Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx);
}
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index b6a05b5d61d..5b44a07b8fd 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -139,6 +139,7 @@ namespace Tests {
TString MeteringFilePath;
TString AwsRegion;
NKqp::IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory = std::make_shared<NKqp::TKqpFederatedQuerySetupFactoryNoop>();
+ bool InitializeFederatedQuerySetupFactory = false;
std::function<IActor*(const NKikimrProto::TAuthConfig&)> CreateTicketParser = NKikimr::CreateTicketParser;
std::shared_ptr<TGrpcServiceFactory> GrpcServiceFactory;
@@ -182,6 +183,7 @@ namespace Tests {
TServerSettings& SetMeteringFilePath(const TString& path) { EnableMetering = true; MeteringFilePath = path; return *this; }
TServerSettings& SetAwsRegion(const TString& value) { AwsRegion = value; return *this; }
TServerSettings& SetFederatedQuerySetupFactory(NKqp::IKqpFederatedQuerySetupFactory::TPtr value) { FederatedQuerySetupFactory = value; return *this; }
+ TServerSettings& SetInitializeFederatedQuerySetupFactory(bool value) { InitializeFederatedQuerySetupFactory = value; return *this; }
TServerSettings& SetPersQueueGetReadSessionsInfoWorkerFactory(
std::shared_ptr<NKikimr::NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> factory
) {