diff options
author | kardymon-d <kardymon-d@yandex-team.com> | 2023-11-08 22:42:47 +0300 |
---|---|---|
committer | kardymon-d <kardymon-d@yandex-team.com> | 2023-11-08 23:11:18 +0300 |
commit | 8dd9800abc35191e880e21d3e9af5017032589b9 (patch) | |
tree | a57e86bc6f04ea6329f1ea0b29eaa1c08448adaf | |
parent | 202a1ac82d1e5912c218f715e2b51eedca69d71b (diff) | |
download | ydb-8dd9800abc35191e880e21d3e9af5017032589b9.tar.gz |
Поддержка yds в dedicated базах
Add resolver to pq
-rw-r--r-- | ydb/core/fq/libs/actors/database_resolver.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/proto/gateways_config.proto | 8 | ||||
-rw-r--r-- | ydb/library/yql/tools/dqrun/dqrun.cpp | 25 |
3 files changed, 25 insertions, 11 deletions
diff --git a/ydb/core/fq/libs/actors/database_resolver.cpp b/ydb/core/fq/libs/actors/database_resolver.cpp index ba574d894d..8e3a50ea46 100644 --- a/ydb/core/fq/libs/actors/database_resolver.cpp +++ b/ydb/core/fq/libs/actors/database_resolver.cpp @@ -247,9 +247,10 @@ public: Parsers[NYql::EDatabaseType::Ydb] = ydbParser; Parsers[NYql::EDatabaseType::DataStreams] = [ydbParser](NJson::TJsonValue& databaseInfo, const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator, bool useTls) { + bool isDedicatedDb = databaseInfo.GetMap().contains("storageConfig"); auto ret = ydbParser(databaseInfo, mdbEndpointGenerator, useTls); // TODO: Take explicit field from MVP - if (ret.Endpoint.StartsWith("ydb.")) { + if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) { // Replace "ydb." -> "yds." ret.Endpoint[2] = 's'; } diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 13b88763ea..21d6985c8f 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -615,6 +615,13 @@ message TGenericGatewayConfig { reserved 1, 2; } +/////////////////////////////// Db Resolver /////////////////////////////////// + +message TDbResolverConfig { + // Ydb / Yds mvp endpoint + optional string YdbMvpEndpoint = 2; +} + /////////////////////////////// Revision filter /////////////////////////////// message TRevisionException { @@ -652,4 +659,5 @@ message TGatewaysConfig { optional TDbToolConfig DbTool = 20; optional TGenericGatewayConfig Generic = 21; optional TRevisionFilterConfig RevisionFilter = 22; + optional TDbResolverConfig DbResolver = 23; } diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp index e674e5abee..24f2acd24d 100644 --- a/ydb/library/yql/tools/dqrun/dqrun.cpp +++ b/ydb/library/yql/tools/dqrun/dqrun.cpp @@ -268,7 +268,7 @@ std::tuple<std::unique_ptr<TActorSystemManager>, TActorIds> RunActorSystem( TActorIds actorIds; // Run actor system only if necessary - auto needActorSystem = gatewaysConfig.HasGeneric(); + auto needActorSystem = gatewaysConfig.HasGeneric() || gatewaysConfig.HasDbResolver(); if (!needActorSystem) { return std::make_tuple(std::move(actorSystemManager), std::move(actorIds)); } @@ -279,7 +279,7 @@ std::tuple<std::unique_ptr<TActorSystemManager>, TActorIds> RunActorSystem( actorSystemManager->Start(); // Actor system is initialized; start actor registration. - if (gatewaysConfig.HasGeneric()) { + if (needActorSystem) { auto httpProxy = NHttp::CreateHttpProxy(); actorIds.HttpProxy = actorSystemManager->GetActorSystem()->Register(httpProxy); @@ -705,6 +705,17 @@ int RunMain(int argc, const char* argv[]) dataProvidersInit.push_back(GetClickHouseDataProviderInitializer(httpGateway)); } + std::shared_ptr<NFq::TDatabaseAsyncResolverImpl> dbResolver; + if (gatewaysConfig.HasDbResolver()) { + dbResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>( + actorSystemManager->GetActorSystem(), + actorIds.DatabaseResolver, + gatewaysConfig.GetDbResolver().GetYdbMvpEndpoint(), + gatewaysConfig.HasGeneric() ? gatewaysConfig.GetGeneric().GetMdbGateway() : "", + NFq::MakeMdbEndpointGeneratorGeneric(false) + ); + } + NConnector::IClient::TPtr genericClient; if (gatewaysConfig.HasGeneric()) { for (auto& cluster : *gatewaysConfig.MutableGeneric()->MutableClusterMapping()) { @@ -712,13 +723,6 @@ int RunMain(int argc, const char* argv[]) } genericClient = NConnector::MakeClientGRPC(gatewaysConfig.GetGeneric().GetConnector()); - auto dbResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>( - actorSystemManager->GetActorSystem(), - actorIds.DatabaseResolver, - "", - gatewaysConfig.GetGeneric().GetMdbGateway(), - NFq::MakeMdbEndpointGeneratorGeneric(false) - ); dataProvidersInit.push_back(GetGenericDataProviderInitializer(genericClient, dbResolver)); } @@ -752,7 +756,8 @@ int RunMain(int argc, const char* argv[]) for (auto& cluster: gatewaysConfig.GetPq().GetClusterMapping()) { clusters.emplace(to_lower(cluster.GetName()), TString{PqProviderName}); } - dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway)); + + dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway, false, dbResolver)); } if (gatewaysConfig.HasSolomon()) { |