aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkardymon-d <kardymon-d@yandex-team.com>2023-11-08 22:42:47 +0300
committerkardymon-d <kardymon-d@yandex-team.com>2023-11-08 23:11:18 +0300
commit8dd9800abc35191e880e21d3e9af5017032589b9 (patch)
treea57e86bc6f04ea6329f1ea0b29eaa1c08448adaf
parent202a1ac82d1e5912c218f715e2b51eedca69d71b (diff)
downloadydb-8dd9800abc35191e880e21d3e9af5017032589b9.tar.gz
Поддержка yds в dedicated базах
Add resolver to pq
-rw-r--r--ydb/core/fq/libs/actors/database_resolver.cpp3
-rw-r--r--ydb/library/yql/providers/common/proto/gateways_config.proto8
-rw-r--r--ydb/library/yql/tools/dqrun/dqrun.cpp25
3 files changed, 25 insertions, 11 deletions
diff --git a/ydb/core/fq/libs/actors/database_resolver.cpp b/ydb/core/fq/libs/actors/database_resolver.cpp
index ba574d894d..8e3a50ea46 100644
--- a/ydb/core/fq/libs/actors/database_resolver.cpp
+++ b/ydb/core/fq/libs/actors/database_resolver.cpp
@@ -247,9 +247,10 @@ public:
Parsers[NYql::EDatabaseType::Ydb] = ydbParser;
Parsers[NYql::EDatabaseType::DataStreams] = [ydbParser](NJson::TJsonValue& databaseInfo, const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator, bool useTls)
{
+ bool isDedicatedDb = databaseInfo.GetMap().contains("storageConfig");
auto ret = ydbParser(databaseInfo, mdbEndpointGenerator, useTls);
// TODO: Take explicit field from MVP
- if (ret.Endpoint.StartsWith("ydb.")) {
+ if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) {
// Replace "ydb." -> "yds."
ret.Endpoint[2] = 's';
}
diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto
index 13b88763ea..21d6985c8f 100644
--- a/ydb/library/yql/providers/common/proto/gateways_config.proto
+++ b/ydb/library/yql/providers/common/proto/gateways_config.proto
@@ -615,6 +615,13 @@ message TGenericGatewayConfig {
reserved 1, 2;
}
+/////////////////////////////// Db Resolver ///////////////////////////////////
+
+message TDbResolverConfig {
+ // Ydb / Yds mvp endpoint
+ optional string YdbMvpEndpoint = 2;
+}
+
/////////////////////////////// Revision filter ///////////////////////////////
message TRevisionException {
@@ -652,4 +659,5 @@ message TGatewaysConfig {
optional TDbToolConfig DbTool = 20;
optional TGenericGatewayConfig Generic = 21;
optional TRevisionFilterConfig RevisionFilter = 22;
+ optional TDbResolverConfig DbResolver = 23;
}
diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp
index e674e5abee..24f2acd24d 100644
--- a/ydb/library/yql/tools/dqrun/dqrun.cpp
+++ b/ydb/library/yql/tools/dqrun/dqrun.cpp
@@ -268,7 +268,7 @@ std::tuple<std::unique_ptr<TActorSystemManager>, TActorIds> RunActorSystem(
TActorIds actorIds;
// Run actor system only if necessary
- auto needActorSystem = gatewaysConfig.HasGeneric();
+ auto needActorSystem = gatewaysConfig.HasGeneric() || gatewaysConfig.HasDbResolver();
if (!needActorSystem) {
return std::make_tuple(std::move(actorSystemManager), std::move(actorIds));
}
@@ -279,7 +279,7 @@ std::tuple<std::unique_ptr<TActorSystemManager>, TActorIds> RunActorSystem(
actorSystemManager->Start();
// Actor system is initialized; start actor registration.
- if (gatewaysConfig.HasGeneric()) {
+ if (needActorSystem) {
auto httpProxy = NHttp::CreateHttpProxy();
actorIds.HttpProxy = actorSystemManager->GetActorSystem()->Register(httpProxy);
@@ -705,6 +705,17 @@ int RunMain(int argc, const char* argv[])
dataProvidersInit.push_back(GetClickHouseDataProviderInitializer(httpGateway));
}
+ std::shared_ptr<NFq::TDatabaseAsyncResolverImpl> dbResolver;
+ if (gatewaysConfig.HasDbResolver()) {
+ dbResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>(
+ actorSystemManager->GetActorSystem(),
+ actorIds.DatabaseResolver,
+ gatewaysConfig.GetDbResolver().GetYdbMvpEndpoint(),
+ gatewaysConfig.HasGeneric() ? gatewaysConfig.GetGeneric().GetMdbGateway() : "",
+ NFq::MakeMdbEndpointGeneratorGeneric(false)
+ );
+ }
+
NConnector::IClient::TPtr genericClient;
if (gatewaysConfig.HasGeneric()) {
for (auto& cluster : *gatewaysConfig.MutableGeneric()->MutableClusterMapping()) {
@@ -712,13 +723,6 @@ int RunMain(int argc, const char* argv[])
}
genericClient = NConnector::MakeClientGRPC(gatewaysConfig.GetGeneric().GetConnector());
- auto dbResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>(
- actorSystemManager->GetActorSystem(),
- actorIds.DatabaseResolver,
- "",
- gatewaysConfig.GetGeneric().GetMdbGateway(),
- NFq::MakeMdbEndpointGeneratorGeneric(false)
- );
dataProvidersInit.push_back(GetGenericDataProviderInitializer(genericClient, dbResolver));
}
@@ -752,7 +756,8 @@ int RunMain(int argc, const char* argv[])
for (auto& cluster: gatewaysConfig.GetPq().GetClusterMapping()) {
clusters.emplace(to_lower(cluster.GetName()), TString{PqProviderName});
}
- dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway));
+
+ dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway, false, dbResolver));
}
if (gatewaysConfig.HasSolomon()) {