diff options
author | vitalyisaev <[email protected]> | 2023-07-11 15:37:20 +0300 |
---|---|---|
committer | vitalyisaev <[email protected]> | 2023-07-11 15:37:20 +0300 |
commit | 208ce864f80a4e05a6dc4e1fe1145075a7e654de (patch) | |
tree | 68e86a239dce25bc804ccf64476e0438cf577372 | |
parent | 765740530ffb3fc45a4621fea927d01f233811ac (diff) |
YQ Connector: support managed PostgreSQL in dqrun
YQ Connector: support managed PostgreSQL
46 files changed, 287 insertions, 137 deletions
diff --git a/ydb/core/fq/libs/actors/database_resolver.cpp b/ydb/core/fq/libs/actors/database_resolver.cpp index 41341eea4b7..ee5aaccd8cd 100644 --- a/ydb/core/fq/libs/actors/database_resolver.cpp +++ b/ydb/core/fq/libs/actors/database_resolver.cpp @@ -18,14 +18,12 @@ namespace NFq { using namespace NActors; using namespace NYql; -using TEndpoint = NYql::TDbResolverResponse::TEndpoint; +using TEndpoint = NYql::TDatabaseResolverResponse::TEndpoint; -using TParsers = THashMap<NYql::DatabaseType, std::function<TEndpoint(NJson::TJsonValue& body, bool)>>; -using TCache = TTtlCache<std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>, std::variant<TEndpoint, TString>>; +using TParser = std::function<TEndpoint(NJson::TJsonValue& body, const NYql::IMdbHostTransformer::TPtr& mdbHostTransformer)>; +using TParsers = THashMap<NYql::EDatabaseType, TParser>; +using TCache = TTtlCache<std::tuple<TString, NYql::EDatabaseType, NYql::TDatabaseAuth>, std::variant<TEndpoint, TString>>; -TString TransformMdbHostToCorrectFormat(const TString& mdbHost) { - return mdbHost.substr(0, mdbHost.find('.')) + ".db.yandex.net:8443"; -} class TResponseProcessor : public TActorBootstrapped<TResponseProcessor> { @@ -37,16 +35,16 @@ public: TResponseProcessor( const TActorId sender, TCache& cache, - const TDbResolverResponse::TDatabaseEndpointsMap& ready, - const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>>& requests, + const TDatabaseResolverResponse::TDatabaseEndpointsMap& ready, + const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::EDatabaseType, NYql::TDatabaseAuth>>& requests, const TString& traceId, - bool mdbTransformHost, + const NYql::IMdbHostTransformer::TPtr& mdbHostTransformer, const TParsers& parsers) : Sender(sender) , Cache(cache) , Requests(requests) , TraceId(traceId) - , MdbTransformHost(mdbTransformHost) + , MdbHostTransformer(mdbHostTransformer) , DatabaseId2Endpoint(ready) , Parsers(parsers) { } @@ -102,7 +100,7 @@ private: Send(Sender, new TEvents::TEvEndpointResponse( - NYql::TDbResolverResponse(std::move(DatabaseId2Endpoint), Success, issues))); + NYql::TDatabaseResolverResponse(std::move(DatabaseId2Endpoint), Success, issues))); PassAway(); } @@ -111,7 +109,7 @@ private: TString status; TString errorMessage; TString databaseId; - NYql::DatabaseType databaseType = DatabaseType::Ydb; + NYql::EDatabaseType databaseType = EDatabaseType::Ydb; NYql::TDatabaseAuth info; TMaybe<TEndpoint> result; HandledIds++; @@ -129,9 +127,9 @@ private: TParsers::const_iterator parserIt; if (parseJsonOk && (parserIt = Parsers.find(databaseType)) != Parsers.end()) { try { - auto res = parserIt->second(databaseInfo, MdbTransformHost); + auto res = parserIt->second(databaseInfo, MdbHostTransformer); LOG_D("Got db_id: " << databaseId - << ", db type: " << static_cast<std::underlying_type<NYql::DatabaseType>::type>(databaseType) + << ", db type: " << static_cast<std::underlying_type<NYql::EDatabaseType>::type>(databaseType) << ", endpoint: " << res.Endpoint << ", database: " << res.Database); DatabaseId2Endpoint[std::make_pair(databaseId, databaseType)] = res; @@ -140,13 +138,13 @@ private: errorMessage = TStringBuilder() << " Couldn't resolve " << "databaseId: " << databaseId - << ", db type: " << static_cast<std::underlying_type<NYql::DatabaseType>::type>(databaseType) << "\n" + << ", db type: " << static_cast<std::underlying_type<NYql::EDatabaseType>::type>(databaseType) << "\n" << CurrentExceptionMessage(); } } else { errorMessage = TStringBuilder() << "Unable to parse database information." << "Database Id: " << databaseId - << ", db type: " << static_cast<std::underlying_type<NYql::DatabaseType>::type>(databaseType); + << ", db type: " << static_cast<std::underlying_type<NYql::EDatabaseType>::type>(databaseType); } } } else { @@ -184,10 +182,10 @@ private: private: const TActorId Sender; TCache& Cache; - const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>> Requests; + const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::EDatabaseType, NYql::TDatabaseAuth>> Requests; const TString TraceId; - const bool MdbTransformHost; - TDbResolverResponse::TDatabaseEndpointsMap DatabaseId2Endpoint; + const NYql::IMdbHostTransformer::TPtr MdbHostTransformer; + TDatabaseResolverResponse::TDatabaseEndpointsMap DatabaseId2Endpoint; size_t HandledIds = 0; bool Success = true; const TParsers& Parsers; @@ -207,7 +205,7 @@ public: .SetErrorTtl(TDuration::Minutes(1)) .SetMaxSize(1000000)) { - auto ydbParser = [](NJson::TJsonValue& databaseInfo, bool) { + auto ydbParser = [](NJson::TJsonValue& databaseInfo, const NYql::IMdbHostTransformer::TPtr&) { bool secure = false; TString endpoint = databaseInfo.GetMap().at("endpoint").GetStringRobust(); TString prefix("/?database="); @@ -228,10 +226,10 @@ public: Y_ENSURE(endpoint); return TEndpoint{endpoint, database, secure}; }; - Parsers[NYql::DatabaseType::Ydb] = ydbParser; - Parsers[NYql::DatabaseType::DataStreams] = [ydbParser](NJson::TJsonValue& databaseInfo, bool mdbTransformHost) + Parsers[NYql::EDatabaseType::Ydb] = ydbParser; + Parsers[NYql::EDatabaseType::DataStreams] = [ydbParser](NJson::TJsonValue& databaseInfo, const NYql::IMdbHostTransformer::TPtr& mdbHostTransformer) { - auto ret = ydbParser(databaseInfo, mdbTransformHost); + auto ret = ydbParser(databaseInfo, mdbHostTransformer); // TODO: Take explicit field from MVP if (ret.Endpoint.StartsWith("ydb.")) { // Replace "ydb." -> "yds." @@ -239,7 +237,7 @@ public: } return ret; }; - Parsers[NYql::DatabaseType::ClickHouse] = [](NJson::TJsonValue& databaseInfo, bool mdbTransformHost) { + Parsers[NYql::EDatabaseType::ClickHouse] = [](NJson::TJsonValue& databaseInfo, const NYql::IMdbHostTransformer::TPtr& mdbHostTransformer) { TString endpoint; TVector<TString> aliveHosts; for (const auto& host : databaseInfo.GetMap().at("hosts").GetArraySafe()) { @@ -248,37 +246,44 @@ public: } } if (!aliveHosts.empty()) { - endpoint = aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())]; + endpoint = mdbHostTransformer->ToEndpoint( + NYql::EDatabaseType::ClickHouse, + aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())] + ); } if (!endpoint) { - ythrow yexception() << "No ALIVE ClickHouse hosts exist"; + ythrow yexception() << "No ALIVE ClickHouse hosts found"; } - endpoint = mdbTransformHost ? TransformMdbHostToCorrectFormat(endpoint) : endpoint; return TEndpoint{endpoint, "", true}; }; - // TODO: https://st.yandex-team.ru/YQ-2171: support other data sources than ClickHouse - Parsers[NYql::DatabaseType::Generic] = [](NJson::TJsonValue& databaseInfo, bool mdbTransformHost) { + Parsers[NYql::EDatabaseType::PostgreSQL] = [](NJson::TJsonValue& databaseInfo, const NYql::IMdbHostTransformer::TPtr& mdbHostTransformer) { TString endpoint; TVector<TString> aliveHosts; - for (const auto& host : databaseInfo.GetMap().at("hosts").GetArraySafe()) { - if (host["health"].GetString() == "ALIVE" && host["type"].GetString() == "CLICKHOUSE") { + + // all host services must be alive + bool alive = true; + for (const auto& service: host.GetMap().at("services").GetArraySafe()) { + if (service["health"].GetString() != "ALIVE") { + alive = false; + break; + } + } + + if (alive) { aliveHosts.push_back(host["name"].GetString()); } } - if (!aliveHosts.empty()) { - endpoint = aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())]; + endpoint = mdbHostTransformer->ToEndpoint( + NYql::EDatabaseType::PostgreSQL, + aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())] + ); } if (!endpoint) { - ythrow yexception() << "No ALIVE database host exists"; - } - - if (mdbTransformHost) { - // TODO: https://st.yandex-team.ru/YQ-2170: support secure connections on 9440 - endpoint += ":9000"; + ythrow yexception() << "No ALIVE PostgreSQL hosts found"; } return TEndpoint{endpoint, "", true}; @@ -295,7 +300,7 @@ private: void SendResponse( const NActors::TActorId& recipient, - TDbResolverResponse::TDatabaseEndpointsMap&& ready, + TDatabaseResolverResponse::TDatabaseEndpointsMap&& ready, bool success = true, const TString& errorMessage = "") { @@ -304,15 +309,15 @@ private: if (errorMessage) issues.AddIssue(errorMessage); Send(recipient, - new TEvents::TEvEndpointResponse(NYql::TDbResolverResponse{std::move(ready), success, issues})); + new TEvents::TEvEndpointResponse(NYql::TDatabaseResolverResponse{std::move(ready), success, issues})); } void Handle(TEvents::TEvEndpointRequest::TPtr ev) { TraceId = ev->Get()->TraceId; LOG_D("Start databaseId resolver for " << ev->Get()->DatabaseIds.size() << " ids"); - THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>> requests; // request, (dbId, type, info) - TDbResolverResponse::TDatabaseEndpointsMap ready; + THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::EDatabaseType, NYql::TDatabaseAuth>> requests; // request, (dbId, type, info) + TDatabaseResolverResponse::TDatabaseEndpointsMap ready; for (const auto& [p, info] : ev->Get()->DatabaseIds) { const auto& [databaseId, type] = p; TMaybe<std::variant<TEndpoint, TString>> cacheVal; @@ -336,13 +341,14 @@ private: try { TString url; - if (IsIn({NYql::DatabaseType::Ydb, NYql::DatabaseType::DataStreams }, type)) { + if (IsIn({NYql::EDatabaseType::Ydb, NYql::EDatabaseType::DataStreams }, type)) { url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database") .AddUrlParam("databaseId", databaseId) .Build(); - } else { + } else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL }, type)) { YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway"); - url = TUrlBuilder(ev->Get()->MdbGateway + "/managed-clickhouse/v1/clusters/") + url = TUrlBuilder( + ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeToString(type) + "/v1/clusters/") .AddPathComponent(databaseId) .AddPathComponent("hosts") .Build(); @@ -369,7 +375,7 @@ private: if (!requests.empty()) { auto helper = Register( - new TResponseProcessor(ev->Sender, Cache, ready, requests, TraceId, ev->Get()->MdbTransformHost, Parsers)); + new TResponseProcessor(ev->Sender, Cache, ready, requests, TraceId, ev->Get()->MdbHostTransformer, Parsers)); for (const auto& [request, _] : requests) { TActivationContext::Send(new IEventHandle(HttpProxy, helper, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(request))); diff --git a/ydb/core/fq/libs/actors/database_resolver.h b/ydb/core/fq/libs/actors/database_resolver.h index cdabf7194ac..21035416820 100644 --- a/ydb/core/fq/libs/actors/database_resolver.h +++ b/ydb/core/fq/libs/actors/database_resolver.h @@ -6,8 +6,6 @@ namespace NFq { -TString TransformMdbHostToCorrectFormat(const TString& mdbHost); - NActors::IActor* CreateDatabaseResolver(NActors::TActorId httpProxy, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); } /* namespace NFq */ diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index 3e6d6e29286..1399465abd0 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -59,6 +59,7 @@ #include <ydb/core/fq/libs/control_plane_storage/events/events.h> #include <ydb/core/fq/libs/control_plane_storage/util.h> #include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h> +#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.h> #include <ydb/core/fq/libs/gateway/empty_gateway.h> #include <ydb/core/fq/libs/private_client/events.h> #include <ydb/core/fq/libs/private_client/private_client.h> @@ -1824,8 +1825,14 @@ private: clusters); TVector<TDataProviderInitializer> dataProvidersInit; - const std::shared_ptr<IDatabaseAsyncResolver> dbResolver = std::make_shared<TDatabaseAsyncResolverImpl>(NActors::TActivationContext::ActorSystem(), Params.DatabaseResolver, - Params.Config.GetCommon().GetYdbMvpCloudEndpoint(), Params.Config.GetCommon().GetMdbGateway(), Params.Config.GetCommon().GetMdbTransformHost(), Params.QueryId); + const std::shared_ptr<IDatabaseAsyncResolver> dbResolver = std::make_shared<TDatabaseAsyncResolverImpl>( + NActors::TActivationContext::ActorSystem(), + Params.DatabaseResolver, + Params.Config.GetCommon().GetYdbMvpCloudEndpoint(), + Params.Config.GetCommon().GetMdbGateway(), + NFq::MakeTMdbHostTransformerGeneric(), + // Params.Config.GetCommon().GetMdbTransformHost(), + Params.QueryId); { // TBD: move init to better place QueryStateUpdateRequest.set_scope(Params.Scope.ToString()); diff --git a/ydb/core/fq/libs/config/protos/common.proto b/ydb/core/fq/libs/config/protos/common.proto index 7bb6da3577b..ade8d5562c7 100644 --- a/ydb/core/fq/libs/config/protos/common.proto +++ b/ydb/core/fq/libs/config/protos/common.proto @@ -17,7 +17,7 @@ message TCommonConfig { string YdbMvpCloudEndpoint = 2; string MdbGateway = 3; bool UseBearerForYdb = 4; - bool MdbTransformHost = 5; + bool MdbTransformHost = 5 [deprecated = true]; // TODO: remove it in https://st.yandex-team.ru/YQ-2229 string ObjectStorageEndpoint = 6; string IdsPrefix = 7; uint64 MaxTasksPerOperation = 8; diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.darwin-x86_64.txt index ec08e5f4a83..3c27829d986 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.darwin-x86_64.txt @@ -21,4 +21,5 @@ target_link_libraries(fq-libs-db_id_async_resolver_impl PUBLIC ) target_sources(fq-libs-db_id_async_resolver_impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp ) diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-aarch64.txt index 1b9b50a5624..a26a3f5d590 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-aarch64.txt @@ -22,4 +22,5 @@ target_link_libraries(fq-libs-db_id_async_resolver_impl PUBLIC ) target_sources(fq-libs-db_id_async_resolver_impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp ) diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-x86_64.txt index 1b9b50a5624..a26a3f5d590 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-x86_64.txt @@ -22,4 +22,5 @@ target_link_libraries(fq-libs-db_id_async_resolver_impl PUBLIC ) target_sources(fq-libs-db_id_async_resolver_impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp ) diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.windows-x86_64.txt index ec08e5f4a83..3c27829d986 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.windows-x86_64.txt @@ -21,4 +21,5 @@ target_link_libraries(fq-libs-db_id_async_resolver_impl PUBLIC ) target_sources(fq-libs-db_id_async_resolver_impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp ) diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp index 2fcacfe931b..4621a2c4be0 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp @@ -10,18 +10,18 @@ TDatabaseAsyncResolverImpl::TDatabaseAsyncResolverImpl( const NActors::TActorId& recipient, const TString& ydbMvpEndpoint, const TString& mdbGateway, - bool mdbTransformHost, + NYql::IMdbHostTransformer::TPtr&& mdbHostTransformer, const TString& traceId) : ActorSystem(actorSystem) , Recipient(recipient) , YdbMvpEndpoint(ydbMvpEndpoint) , MdbGateway(mdbGateway) - , MdbTransformHost(mdbTransformHost) + , MdbHostTransformer(std::move(mdbHostTransformer)) , TraceId(traceId) { } -TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(const TDatabaseAuthMap& ids) const +TFuture<NYql::TDatabaseResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(const TDatabaseAuthMap& ids) const { // Cloud database ids validataion for (const auto& kv: ids) { @@ -29,7 +29,7 @@ TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(const YQL_ENSURE(kv.first.first, "empty cluster name"); } - auto promise = NewPromise<NYql::TDbResolverResponse>(); + auto promise = NewPromise<NYql::TDatabaseResolverResponse>(); TDuration timeout = TDuration::Seconds(40); auto callback = MakeHolder<NYql::TRichActorFutureCallback<TEvents::TEvEndpointResponse>>( [promise] (TAutoPtr<NActors::TEventHandle<TEvents::TEvEndpointResponse>>& event) mutable { @@ -45,7 +45,7 @@ TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(const ActorSystem->Send(new NActors::IEventHandle(Recipient, callbackId, new TEvents::TEvEndpointRequest(ids, YdbMvpEndpoint, MdbGateway, - TraceId, MdbTransformHost))); + TraceId, MdbHostTransformer))); return promise.GetFuture(); } diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h index dbb51e48b27..8feb0ecdcc8 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h @@ -1,6 +1,8 @@ #pragma once + #include <ydb/core/fq/libs/events/events.h> #include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_host_transformer.h> #include <ydb/library/yql/providers/dq/actors/actor_helpers.h> namespace NFq { @@ -12,17 +14,17 @@ public: const NActors::TActorId& recipient, const TString& ydbMvpEndpoint, const TString& mdbGateway, - bool mdbTransformHost = false, + NYql::IMdbHostTransformer::TPtr&& mdbHostTransformer, const TString& traceId = "" ); - NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds(const TDatabaseAuthMap& ids) const override; + NThreading::TFuture<NYql::TDatabaseResolverResponse> ResolveIds(const TDatabaseAuthMap& ids) const override; private: NActors::TActorSystem* ActorSystem; const NActors::TActorId Recipient; const TString YdbMvpEndpoint; const TString MdbGateway; - const bool MdbTransformHost = false; + NYql::IMdbHostTransformer::TPtr MdbHostTransformer; const TString TraceId; }; diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp new file mode 100644 index 00000000000..8772a7bf131 --- /dev/null +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp @@ -0,0 +1,55 @@ +#include "mdb_host_transformer.h" + +namespace NFq { + // TMdbHostTransformerLegacy implements behavior required by YQL legacy ClickHouse provider + class TMdbHostTransformerLegacy: public NYql::IMdbHostTransformer { + TString ToEndpoint(const NYql::EDatabaseType databaseType, const TString& mdbHost) const override { + // Inherited from here + // https://a.yandex-team.ru/arcadia/ydb/core/fq/libs/actors/database_resolver.cpp?rev=r11819335#L27 + if (databaseType == NYql::EDatabaseType::ClickHouse) { + return mdbHost.substr(0, mdbHost.find('.')) + ".db.yandex.net:8443"; + } + + ythrow yexception() << TStringBuilder() << "Unexpected database type: " << int(databaseType); + } + }; + + NYql::IMdbHostTransformer::TPtr + MakeTMdbHostTransformerLegacy() { + return std::make_shared<TMdbHostTransformerLegacy>(); + } + + // TMdbHostTransformerGeneric implements behavior required by YQL Generic provider + // that interacts with data sources through a separate Connector service + class TMdbHostTransformerGeneric: public NYql::IMdbHostTransformer { + TString ToEndpoint(const NYql::EDatabaseType databaseType, const TString& mdbHost) const override { + switch (databaseType) { + case NYql::EDatabaseType::ClickHouse: + // TODO: https://st.yandex-team.ru/YQ-2170: support secure connections on 9440 + return mdbHost + ":9000"; + case NYql::EDatabaseType::PostgreSQL: + // https://cloud.yandex.ru/docs/managed-postgresql/operations/connect + return mdbHost + ":6432"; + default: + ythrow yexception() << TStringBuilder() << "Unexpected database type: " << int(databaseType); + }; + } + }; + + NYql::IMdbHostTransformer::TPtr + MakeTMdbHostTransformerGeneric() { + return std::make_shared<TMdbHostTransformerGeneric>(); + } + + // TMdbHostTransformerNoop just does nothing + class TMdbHostTransformerNoop: public NYql::IMdbHostTransformer { + TString ToEndpoint(const NYql::EDatabaseType, const TString& mdbHost) const override { + return mdbHost; + } + }; + + NYql::IMdbHostTransformer::TPtr + MakeTMdbHostTransformerNoop() { + return std::make_shared<TMdbHostTransformerNoop>(); + } +}
\ No newline at end of file diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.h b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.h new file mode 100644 index 00000000000..4a26bc6b654 --- /dev/null +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.h @@ -0,0 +1,10 @@ +#pragma once + +#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_host_transformer.h> + +namespace NFq { + NYql::IMdbHostTransformer::TPtr MakeTMdbHostTransformerLegacy(); + NYql::IMdbHostTransformer::TPtr MakeTMdbHostTransformerGeneric(); + NYql::IMdbHostTransformer::TPtr MakeTMdbHostTransformerNoop(); +} diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/ya.make b/ydb/core/fq/libs/db_id_async_resolver_impl/ya.make index 364c1ecd51f..2ac916e03c6 100644 --- a/ydb/core/fq/libs/db_id_async_resolver_impl/ya.make +++ b/ydb/core/fq/libs/db_id_async_resolver_impl/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( db_async_resolver_impl.cpp + mdb_host_transformer.cpp ) PEERDIR( diff --git a/ydb/core/fq/libs/events/events.h b/ydb/core/fq/libs/events/events.h index 3d5e4e6e2c4..62bc65299a6 100644 --- a/ydb/core/fq/libs/events/events.h +++ b/ydb/core/fq/libs/events/events.h @@ -3,6 +3,7 @@ #include <ydb/library/yql/core/facade/yql_facade.h> #include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_host_transformer.h> #include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -105,29 +106,29 @@ struct TEvents { }; struct TEvEndpointResponse : NActors::TEventLocal<TEvEndpointResponse, TEventIds::EvEndpointResponse> { - NYql::TDbResolverResponse DbResolverResponse; - explicit TEvEndpointResponse(NYql::TDbResolverResponse&& response) noexcept : DbResolverResponse(std::move(response)) {} + NYql::TDatabaseResolverResponse DbResolverResponse; + explicit TEvEndpointResponse(NYql::TDatabaseResolverResponse&& response) noexcept : DbResolverResponse(std::move(response)) {} }; struct TEvEndpointRequest : NActors::TEventLocal<TEvEndpointRequest, TEventIds::EvEndpointRequest> { - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds; // DbId, DatabaseType => database auth + const NYql::IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseIds; TString YdbMvpEndpoint; TString MdbGateway; TString TraceId; - bool MdbTransformHost; + const NYql::IMdbHostTransformer::TPtr MdbHostTransformer; TEvEndpointRequest( - const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds, + const NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseIds, const TString& ydbMvpEndpoint, const TString& mdbGateway, const TString& traceId, - bool mdbTransformHost) + const NYql::IMdbHostTransformer::TPtr& mdbHostTransformer) : DatabaseIds(databaseIds) , YdbMvpEndpoint(ydbMvpEndpoint) , MdbGateway(mdbGateway) , TraceId(traceId) - , MdbTransformHost(mdbTransformHost) + , MdbHostTransformer(mdbHostTransformer) { } }; diff --git a/ydb/core/fq/libs/test_connection/test_connection.cpp b/ydb/core/fq/libs/test_connection/test_connection.cpp index 1f4a3bc90fa..ec1e760769d 100644 --- a/ydb/core/fq/libs/test_connection/test_connection.cpp +++ b/ydb/core/fq/libs/test_connection/test_connection.cpp @@ -9,6 +9,7 @@ #include <ydb/core/fq/libs/common/util.h> #include <ydb/core/fq/libs/config/yq_issue.h> #include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h> +#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.h> #include <ydb/core/fq/libs/control_plane_storage/config.h> #include <ydb/library/security/util.h> @@ -146,7 +147,8 @@ public: DbResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>( NActors::TActivationContext::ActorSystem(), DatabaseResolverActor, CommonConfig.GetYdbMvpCloudEndpoint(), CommonConfig.GetMdbGateway(), - CommonConfig.GetMdbTransformHost()); + NFq::MakeTMdbHostTransformerGeneric() + ); Become(&TTestConnectionActor::StateFunc); } diff --git a/ydb/core/fq/libs/test_connection/test_data_streams.cpp b/ydb/core/fq/libs/test_connection/test_data_streams.cpp index 363b5a7a660..8e9fc1f02c5 100644 --- a/ydb/core/fq/libs/test_connection/test_data_streams.cpp +++ b/ydb/core/fq/libs/test_connection/test_data_streams.cpp @@ -25,9 +25,9 @@ struct TEvPrivate { static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); struct TEvResolveDbResponse : NActors::TEventLocal<TEvResolveDbResponse, EvResolveDbResponse> { - NYql::TDbResolverResponse Result; + NYql::TDatabaseResolverResponse Result; - TEvResolveDbResponse(const NYql::TDbResolverResponse& result) + TEvResolveDbResponse(const NYql::TDatabaseResolverResponse& result) : Result(result) {} }; @@ -145,14 +145,14 @@ private: SendOpenSession(); return; } - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids; - ids[std::pair{ClusterConfig.GetDatabaseId(), NYql::DatabaseType::DataStreams}] = {StructuredToken, CommonConfig.GetUseBearerForYdb()}; + NYql::IDatabaseAsyncResolver::TDatabaseAuthMap ids; + ids[std::pair{ClusterConfig.GetDatabaseId(), NYql::EDatabaseType::DataStreams}] = {StructuredToken, CommonConfig.GetUseBearerForYdb()}; DbResolver->ResolveIds(ids).Apply([self=SelfId(), as=TActivationContext::ActorSystem()](const auto& future) { try { auto result = future.GetValue(); as->Send(new IEventHandle(self, self, new TEvPrivate::TEvResolveDbResponse(result), 0)); } catch (...) { - as->Send(new IEventHandle(self, self, new TEvPrivate::TEvResolveDbResponse(NYql::TDbResolverResponse{{}, false, NYql::TIssues{MakeErrorIssue(NFq::TIssuesIds::BAD_REQUEST, CurrentExceptionMessage())}}), 0)); + as->Send(new IEventHandle(self, self, new TEvPrivate::TEvResolveDbResponse(NYql::TDatabaseResolverResponse{{}, false, NYql::TIssues{MakeErrorIssue(NFq::TIssuesIds::BAD_REQUEST, CurrentExceptionMessage())}}), 0)); } }); } @@ -165,7 +165,7 @@ private: return; } - auto it = response.DatabaseId2Endpoint.find(std::pair{ClusterConfig.GetDatabaseId(), NYql::DatabaseType::DataStreams}); + auto it = response.DatabaseId2Endpoint.find(std::pair{ClusterConfig.GetDatabaseId(), NYql::EDatabaseType::DataStreams}); if (it == response.DatabaseId2Endpoint.end()) { TC_LOG_E(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Test data streams connection: database is not found for database_id " << ClusterConfig.GetDatabaseId()); ReplyError(TStringBuilder{} << "Test data streams connection: database is not found for database_id " << ClusterConfig.GetDatabaseId()); diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp index 8b7bc177d66..5b05314fd7b 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp @@ -15,7 +15,7 @@ using namespace NNodes; class TClickHouseIODiscoveryTransformer : public TGraphTransformerBase { -using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>; +using TDbId2Endpoint = THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseResolverResponse::TEndpoint>; public: TClickHouseIODiscoveryTransformer(TClickHouseState::TPtr state) @@ -31,7 +31,7 @@ public: if (!State_->DbResolver) return TStatus::Ok; - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids; + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> ids; if (auto reads = FindNodes(input, [&](const TExprNode::TPtr& node) { const TExprBase nodeExpr(node); if (!nodeExpr.Maybe<TClRead>()) @@ -50,7 +50,7 @@ public: auto dbId = State_->Configuration->Endpoints[cluster].first; dbId = dbId.substr(0, dbId.find(':')); YQL_CLOG(DEBUG, ProviderClickHouse) << "Found dbId: " << dbId; - const auto idKey = std::make_pair(dbId, NYql::DatabaseType::ClickHouse); + const auto idKey = std::make_pair(dbId, NYql::EDatabaseType::ClickHouse); const auto iter = State_->DatabaseIds.find(idKey); if (iter != State_->DatabaseIds.end()) { YQL_CLOG(DEBUG, ProviderClickHouse) << "Resolve CH id: " << dbId; @@ -64,7 +64,7 @@ public: return TStatus::Ok; } - const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_; + const std::weak_ptr<NYql::TDatabaseResolverResponse> response = DbResolverResponse_; AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) *res = std::move(future.ExtractValue()); @@ -84,7 +84,7 @@ public: return TStatus::Error; } FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end()); - DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); YQL_CLOG(DEBUG, ProviderClickHouse) << "ResolvedIds: " << FullResolvedIds_.size(); auto& endpoints = State_->Configuration->Endpoints; const auto& id2Clusters = State_->Configuration->DbId2Clusters; @@ -113,14 +113,14 @@ public: void Rewind() final { AsyncFuture_ = {}; FullResolvedIds_.clear(); - DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); } private: const TClickHouseState::TPtr State_; NThreading::TFuture<void> AsyncFuture_; TDbId2Endpoint FullResolvedIds_; - std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + std::shared_ptr<NYql::TDatabaseResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); }; } diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h index 849ed534ed8..650f4a8df75 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h @@ -27,7 +27,7 @@ struct TClickHouseState : public TThrRefBase TTypeAnnotationContext* Types = nullptr; TClickHouseConfiguration::TPtr Configuration = MakeIntrusive<TClickHouseConfiguration>(); const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds; + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> DatabaseIds; std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver; }; diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h index bcf1acd4f2d..fc1acfff7f4 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h @@ -23,7 +23,7 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS void Init( const TProtoConfig& config, const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds) + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& databaseIds) { TVector<TString> clusters(Reserve(config.ClusterMappingSize())); for (auto& cluster: config.GetClusterMapping()) { @@ -39,7 +39,7 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS if (dbResolver) { YQL_CLOG(DEBUG, ProviderClickHouse) << "Settings: clusterName = " << cluster.GetName() << ", clusterDbId = " << cluster.GetId() << ", cluster.GetCluster(): " << cluster.GetCluster() << ", HasCluster: " << (cluster.HasCluster() ? "TRUE" : "FALSE") ; - databaseIds[std::make_pair(cluster.GetId(), NYql::DatabaseType::ClickHouse)] = + databaseIds[std::make_pair(cluster.GetId(), NYql::EDatabaseType::ClickHouse)] = NYql::TDatabaseAuth{cluster.GetCHToken(), /*AddBearer=*/false}; if (cluster.GetId()) { DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName()); diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.darwin-x86_64.txt index 7bbfc9a4c1a..df50b83d349 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.darwin-x86_64.txt @@ -12,5 +12,6 @@ target_link_libraries(providers-common-db_id_async_resolver INTERFACE contrib-libs-cxxsupp yutil cpp-threading-future + connector-api-common yql-public-issue ) diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-aarch64.txt index 392cc3fcaea..6578ba76e03 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-aarch64.txt @@ -13,5 +13,6 @@ target_link_libraries(providers-common-db_id_async_resolver INTERFACE contrib-libs-cxxsupp yutil cpp-threading-future + connector-api-common yql-public-issue ) diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-x86_64.txt index 392cc3fcaea..6578ba76e03 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-x86_64.txt @@ -13,5 +13,6 @@ target_link_libraries(providers-common-db_id_async_resolver INTERFACE contrib-libs-cxxsupp yutil cpp-threading-future + connector-api-common yql-public-issue ) diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.windows-x86_64.txt index 7bbfc9a4c1a..df50b83d349 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.windows-x86_64.txt @@ -12,5 +12,6 @@ target_link_libraries(providers-common-db_id_async_resolver INTERFACE contrib-libs-cxxsupp yutil cpp-threading-future + connector-api-common yql-public-issue ) diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h index 25db8049952..4a58980d9d0 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h +++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h @@ -2,18 +2,40 @@ #include <library/cpp/threading/future/future.h> #include <util/string/builder.h> +#include <ydb/library/yql/providers/generic/connector/api/common/data_source.pb.h> #include <ydb/library/yql/public/issue/yql_issue.h> namespace NYql { -enum class DatabaseType { +enum class EDatabaseType { Ydb, ClickHouse, DataStreams, ObjectStorage, - Generic + PostgreSQL }; +inline EDatabaseType DataSourceKindToDatabaseType(NConnector::NApi::EDataSourceKind dataSourceKind) { + switch (dataSourceKind) { + case NConnector::NApi::EDataSourceKind::POSTGRESQL: + return EDatabaseType::PostgreSQL; + case NConnector::NApi::EDataSourceKind::CLICKHOUSE: + return EDatabaseType::ClickHouse; + default: + ythrow yexception() << TStringBuf() << "Unknown data source kind: " << NConnector::NApi::EDataSourceKind_Name(dataSourceKind); + } +} + +inline TString DatabaseTypeToString(EDatabaseType databaseType) { + switch (databaseType) { + case EDatabaseType::ClickHouse: + return "clickhouse"; + case EDatabaseType::PostgreSQL: + return "postgresql"; + default: + ythrow yexception() << TStringBuf() << "Unknown database type: " << int(databaseType); + } +} struct TDatabaseAuth { TString StructuredToken; @@ -27,7 +49,7 @@ struct TDatabaseAuth { } }; -struct TDbResolverResponse { +struct TDatabaseResolverResponse { struct TEndpoint { std::tuple<TString, ui32> ParseHostPort() const { @@ -47,11 +69,11 @@ struct TDbResolverResponse { bool Secure = false; }; - using TDatabaseEndpointsMap = THashMap<std::pair<TString, DatabaseType>, TEndpoint>; + using TDatabaseEndpointsMap = THashMap<std::pair<TString, EDatabaseType>, TEndpoint>; - TDbResolverResponse() = default; + TDatabaseResolverResponse() = default; - TDbResolverResponse( + TDatabaseResolverResponse( TDatabaseEndpointsMap&& databaseId2Endpoint, bool success = false, const NYql::TIssues& issues = {}) @@ -66,9 +88,9 @@ struct TDbResolverResponse { class IDatabaseAsyncResolver { public: - using TDatabaseAuthMap = THashMap<std::pair<TString, DatabaseType>, NYql::TDatabaseAuth>; + using TDatabaseAuthMap = THashMap<std::pair<TString, EDatabaseType>, NYql::TDatabaseAuth>; - virtual NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds(const TDatabaseAuthMap& ids) const = 0; + virtual NThreading::TFuture<NYql::TDatabaseResolverResponse> ResolveIds(const TDatabaseAuthMap& ids) const = 0; virtual ~IDatabaseAsyncResolver() = default; }; diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/mdb_host_transformer.h b/ydb/library/yql/providers/common/db_id_async_resolver/mdb_host_transformer.h new file mode 100644 index 00000000000..188f739f984 --- /dev/null +++ b/ydb/library/yql/providers/common/db_id_async_resolver/mdb_host_transformer.h @@ -0,0 +1,17 @@ +#pragma once + +#include "db_async_resolver.h" + +namespace NYql { + // IMdbHostTransformer is responsible for transforming the managed database instance host name + // into endpoint (`fqdn:port`) to establish network connection with data source. + // The host names are obtained from MDB API, for example: + // https://cloud.yandex.ru/docs/managed-clickhouse/api-ref/Cluster/listHosts + class IMdbHostTransformer { + public: + using TPtr = std::shared_ptr<IMdbHostTransformer>; + + virtual TString ToEndpoint(const NYql::EDatabaseType databaseType, const TString& mdbHost) const = 0; + virtual ~IMdbHostTransformer() = default; + }; +} diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/ya.make b/ydb/library/yql/providers/common/db_id_async_resolver/ya.make index 2cf88d4759e..9e226a741ad 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/ya.make +++ b/ydb/library/yql/providers/common/db_id_async_resolver/ya.make @@ -2,10 +2,12 @@ LIBRARY() SRCS( db_async_resolver.h + mdb_host_transformer.h ) PEERDIR( library/cpp/threading/future + ydb/library/yql/providers/generic/connector/api/common ydb/library/yql/public/issue ) diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp index f05688124d5..32c4c52068b 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp @@ -51,7 +51,7 @@ namespace NYql { ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TBlockExprType>(item->GetItemType()))); } - // FIXME: + // FIXME: YQ-2190 // Clickhouse provider used to work with multiple tables simultaneously; // I don't know what to do with others. break; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp index aa8ba068eaa..279fe4fb606 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp @@ -29,7 +29,7 @@ namespace NYql { if (!State_->DbResolver) return TStatus::Ok; - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids; + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> ids; if (auto reads = FindNodes(input, [&](const TExprNode::TPtr& node) { const TExprBase nodeExpr(node); @@ -48,10 +48,11 @@ namespace NYql { const auto clusterName = read.DataSource().Cluster().StringValue(); YQL_CLOG(DEBUG, ProviderGeneric) << "found cluster name: " << clusterName; - auto databaseId = State_->Configuration->ClusterNamesToClusterConfigs[clusterName].GetDatabaseId(); + auto& cluster = State_->Configuration->ClusterNamesToClusterConfigs[clusterName]; + auto databaseId = cluster.GetDatabaseId(); if (databaseId) { YQL_CLOG(DEBUG, ProviderGeneric) << "found database id: " << databaseId; - const auto idKey = std::make_pair(databaseId, NYql::DatabaseType::Generic); + const auto idKey = std::make_pair(databaseId, DataSourceKindToDatabaseType(cluster.GetKind())); const auto iter = State_->DatabaseIds.find(idKey); if (iter != State_->DatabaseIds.end()) { YQL_CLOG(DEBUG, ProviderGeneric) << "resolve database id: " << databaseId; @@ -68,7 +69,7 @@ namespace NYql { // FIXME: overengineered code - instead of using weak_ptr, directly copy shared_ptr in callback in this way: // Apply([response = DbResolverResponse_](...)) - const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_; + const std::weak_ptr<NYql::TDatabaseResolverResponse> response = DbResolverResponse_; AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) { *res = std::move(future.ExtractValue()); @@ -93,7 +94,7 @@ namespace NYql { // Copy resolver results and reallocate pointer auto databaseIdsToEndpointsResolved = std::move(DbResolverResponse_->DatabaseId2Endpoint); - DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); // Modify cluster configs with resolved ids return ModifyClusterConfigs(databaseIdsToEndpointsResolved, ctx); @@ -101,11 +102,11 @@ namespace NYql { void Rewind() final { AsyncFuture_ = {}; - DbResolverResponse_.reset(new NYql::TDbResolverResponse); + DbResolverResponse_.reset(new NYql::TDatabaseResolverResponse); } private: - TStatus ModifyClusterConfigs(const TDbResolverResponse::TDatabaseEndpointsMap& databaseIdsToEndpoints, TExprContext& ctx) { + TStatus ModifyClusterConfigs(const TDatabaseResolverResponse::TDatabaseEndpointsMap& databaseIdsToEndpoints, TExprContext& ctx) { const auto& databaseIdsToClusterNames = State_->Configuration->DatabaseIdsToClusterNames; auto& clusterNamesToClusterConfigs = State_->Configuration->ClusterNamesToClusterConfigs; @@ -147,7 +148,7 @@ namespace NYql { const TGenericState::TPtr State_; NThreading::TFuture<void> AsyncFuture_; - std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + std::shared_ptr<NYql::TDatabaseResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); }; } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h index b493fc3d505..b6e043e4f2d 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h @@ -58,7 +58,7 @@ namespace NYql { if (dbResolver && databaseId) { const auto token = MakeStructuredToken(cluster, credentials); - databaseIds[std::make_pair(databaseId, NYql::DatabaseType::Generic)] = NYql::TDatabaseAuth{token, /*AddBearer=*/true}; + databaseIds[std::make_pair(databaseId, DataSourceKindToDatabaseType(cluster.GetKind()))] = NYql::TDatabaseAuth{token, /*AddBearer=*/true}; DatabaseIdsToClusterNames[databaseId].emplace_back(clusterName); YQL_CLOG(DEBUG, ProviderGeneric) << "database id '" << databaseId << "' added to mapping"; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp index e5437828e24..9608c056021 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp @@ -12,7 +12,7 @@ namespace { using namespace NNodes; class TPqDataSinkIODiscoveryTransformer : public TGraphTransformerBase { -using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>; +using TDbId2Endpoint = THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseResolverResponse::TEndpoint>; public: explicit TPqDataSinkIODiscoveryTransformer(TPqState::TPtr state) : State_(state) @@ -28,13 +28,13 @@ public: if (!State_->DbResolver) return TStatus::Ok; - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids; + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> ids; FindYdsDbIdsForResolving(State_, input, ids); if (ids.empty()) return TStatus::Ok; - const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_; + const std::weak_ptr<NYql::TDatabaseResolverResponse> response = DbResolverResponse_; AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) @@ -55,7 +55,7 @@ public: return TStatus::Error; } FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end()); - DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); FillSettingsWithResolvedYdsIds(State_, FullResolvedIds_); return TStatus::Ok; } @@ -63,14 +63,14 @@ public: void Rewind() final { AsyncFuture_ = {}; FullResolvedIds_.clear(); - DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); } private: const TPqState::TPtr State_; NThreading::TFuture<void> AsyncFuture_; TDbId2Endpoint FullResolvedIds_; - std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + std::shared_ptr<NYql::TDatabaseResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); }; } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp index 7bc56eef366..d845e40e90b 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp @@ -29,7 +29,7 @@ TCoNameValueTupleList BuildTopicPropsList(const TPqState::TTopicMeta& meta, TPos void FindYdsDbIdsForResolving( const TPqState::TPtr& state, TExprNode::TPtr input, - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids) + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& ids) { if (auto pqNodes = FindNodes(input, [&](const TExprNode::TPtr& node) { if (auto maybePqRead = TMaybeNode<TPqRead>(node)) { @@ -67,7 +67,7 @@ void FindYdsDbIdsForResolving( if (!foundSetting->second.DatabaseId) continue; YQL_CLOG(INFO, ProviderPq) << "Resolve YDS id: " << foundSetting->second.DatabaseId; - const auto idKey = std::make_pair(foundSetting->second.DatabaseId, NYql::DatabaseType::DataStreams); + const auto idKey = std::make_pair(foundSetting->second.DatabaseId, NYql::EDatabaseType::DataStreams); const auto foundDbId = state->DatabaseIds.find(idKey); if (foundDbId != state->DatabaseIds.end()) { ids[idKey] = foundDbId->second; @@ -79,7 +79,7 @@ void FindYdsDbIdsForResolving( void FillSettingsWithResolvedYdsIds( const TPqState::TPtr& state, - const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>& fullResolvedIds) + const TDatabaseResolverResponse::TDatabaseEndpointsMap& fullResolvedIds) { YQL_CLOG(INFO, ProviderPq) << "FullResolvedIds size: " << fullResolvedIds.size(); auto& clusters = state->Configuration->ClustersConfigurationSettings; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h index 7804e63a34c..8cd752b3bbe 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h @@ -15,10 +15,10 @@ NNodes::TCoNameValueTupleList BuildTopicPropsList(const TPqState::TTopicMeta& me void FindYdsDbIdsForResolving( const TPqState::TPtr& state, TExprNode::TPtr input, - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids); + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& ids); void FillSettingsWithResolvedYdsIds( const TPqState::TPtr& state, - const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>& fullResolvedIds); + const TDatabaseResolverResponse::TDatabaseEndpointsMap& fullResolvedIds); } // namespace NYql diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp index 4eef17a78d7..cc6939426e1 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp @@ -13,7 +13,7 @@ using namespace NNodes; class TPqIODiscoveryTransformer : public TGraphTransformerBase { -using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>; +using TDbId2Endpoint = THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseResolverResponse::TEndpoint>; public: explicit TPqIODiscoveryTransformer(TPqState::TPtr state) @@ -30,13 +30,13 @@ public: if (!State_->DbResolver) return TStatus::Ok; - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids; + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> ids; FindYdsDbIdsForResolving(State_, input, ids); if (ids.empty()) return TStatus::Ok; - const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_; + const std::weak_ptr<NYql::TDatabaseResolverResponse> response = DbResolverResponse_; AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) @@ -57,7 +57,7 @@ public: return TStatus::Error; } FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end()); - DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); FillSettingsWithResolvedYdsIds(State_, FullResolvedIds_); return TStatus::Ok; } @@ -65,14 +65,14 @@ public: void Rewind() final { AsyncFuture_ = {}; FullResolvedIds_.clear(); - DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); } private: const TPqState::TPtr State_; NThreading::TFuture<void> AsyncFuture_; TDbId2Endpoint FullResolvedIds_; - std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + std::shared_ptr<NYql::TDatabaseResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); }; } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h index fa6d6077ec8..9819e3f3c5f 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h @@ -52,7 +52,7 @@ public: const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; IPqGateway::TPtr Gateway; THolder<IDqIntegration> DqIntegration; - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds; + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> DatabaseIds; std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver; }; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp index ea2d32abd9e..c424fa9d0e9 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp @@ -18,7 +18,7 @@ void TPqConfiguration::Init( const TPqGatewayConfig& config, TIntrusivePtr<TTypeAnnotationContext> typeCtx, const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds) + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& databaseIds) { TVector<TString> clusters(Reserve(config.ClusterMappingSize())); for (auto& cluster: config.GetClusterMapping()) { @@ -52,7 +52,7 @@ void TPqConfiguration::Init( YQL_CLOG(DEBUG, ProviderPq) << "Settings: clusterName = " << cluster.GetName() << ", clusterDbId = " << cluster.GetDatabaseId() << ", cluster.GetEndpoint(): " << cluster.GetEndpoint() << ", HasEndpoint = " << (cluster.HasEndpoint() ? "TRUE" : "FALSE") ; if (cluster.GetDatabaseId()) { - databaseIds[std::make_pair(cluster.GetDatabaseId(), NYql::DatabaseType::DataStreams)] = + databaseIds[std::make_pair(cluster.GetDatabaseId(), NYql::EDatabaseType::DataStreams)] = NYql::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()}; DbId2Clusters[cluster.GetDatabaseId()].emplace_back(cluster.GetName()); YQL_CLOG(DEBUG, ProviderPq) << "Add dbId: " << cluster.GetDatabaseId() << " to DbId2Clusters"; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h index 382af9c82b7..a720506ef12 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h @@ -41,7 +41,7 @@ struct TPqConfiguration : public TPqSettings, public NCommon::TSettingDispatcher const TPqGatewayConfig& config, TIntrusivePtr<TTypeAnnotationContext> typeCtx, const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds); + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& databaseIds); TString GetDatabaseForTopic(const TString& cluster) const; diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp index 1aea23417fe..28ecf98338e 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp @@ -17,7 +17,7 @@ namespace { using namespace NNodes; class TYdbIODiscoveryTransformer : public TGraphTransformerBase { -using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>; +using TDbId2Endpoint = THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseResolverResponse::TEndpoint>; public: TYdbIODiscoveryTransformer(TYdbState::TPtr state) : State_(std::move(state)) @@ -32,7 +32,7 @@ public: if (!State_->DbResolver) return TStatus::Ok; - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids; + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> ids; if (auto reads = FindNodes(input, [&](const TExprNode::TPtr& node) { const TExprBase nodeExpr(node); if (!nodeExpr.Maybe<TYdbRead>()) @@ -47,7 +47,7 @@ public: const TYdbRead read(node); const auto& cluster = read.DataSource().Cluster().StringValue(); const auto& dbId = State_->Configuration->Clusters[cluster].DatabaseId; - const auto idKey = std::make_pair(dbId, NYql::DatabaseType::Ydb); + const auto idKey = std::make_pair(dbId, NYql::EDatabaseType::Ydb); const auto iter = State_->DatabaseIds.find(idKey); if (iter != State_->DatabaseIds.end()) { ids[idKey] = iter->second; @@ -57,7 +57,7 @@ public: if (ids.empty()) { return TStatus::Ok; } - const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_; + const std::weak_ptr<NYql::TDatabaseResolverResponse> response = DbResolverResponse_; AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) @@ -78,7 +78,7 @@ public: return TStatus::Error; } FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end()); - DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); auto& clusters = State_->Configuration->Clusters; const auto& id2Clusters = State_->Configuration->DbId2Clusters; for (const auto& [dbIdWithType, info] : FullResolvedIds_) { @@ -100,14 +100,14 @@ public: void Rewind() final { AsyncFuture_ = {}; FullResolvedIds_.clear(); - DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); } private: const TYdbState::TPtr State_; NThreading::TFuture<void> AsyncFuture_; TDbId2Endpoint FullResolvedIds_; - std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); + std::shared_ptr<NYql::TDatabaseResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>(); }; } diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h index 1135c62753a..d5b00a35036 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h @@ -33,7 +33,7 @@ struct TYdbState : public TThrRefBase TYdbConfiguration::TPtr Configuration = MakeIntrusive<TYdbConfiguration>(); const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds; + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> DatabaseIds; std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver; }; diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp index 8c170078f54..7c088646b61 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp @@ -21,7 +21,7 @@ void TYdbConfiguration::Init( const TYdbGatewayConfig& config, TIntrusivePtr<TTypeAnnotationContext> typeCtx, const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds) + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& databaseIds) { TVector<TString> clusters(Reserve(config.ClusterMappingSize())); for (auto& cluster: config.GetClusterMapping()) { @@ -43,7 +43,7 @@ void TYdbConfiguration::Init( if (dbResolver) { if (cluster.GetId()) { - databaseIds[std::make_pair(cluster.GetId(), NYql::DatabaseType::Ydb)] = + databaseIds[std::make_pair(cluster.GetId(), NYql::EDatabaseType::Ydb)] = NYql::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()}; DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName()); } diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h index 1d72a52cd34..ee2bc0df18c 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h @@ -34,7 +34,7 @@ struct TYdbConfiguration : public TYdbSettings, public NCommon::TSettingDispatch const TYdbGatewayConfig& config, TIntrusivePtr<TTypeAnnotationContext> typeCtx, const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, - THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds + THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& databaseIds ); bool HasCluster(TStringBuf cluster) const; diff --git a/ydb/services/fq/ut_integration/CMakeLists.darwin-x86_64.txt b/ydb/services/fq/ut_integration/CMakeLists.darwin-x86_64.txt index 00b63b10631..48945309966 100644 --- a/ydb/services/fq/ut_integration/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/fq/ut_integration/CMakeLists.darwin-x86_64.txt @@ -25,6 +25,7 @@ target_link_libraries(ydb-services-fq-ut_integration PUBLIC cpp-regex-pcre library-cpp-svnversion fq-libs-control_plane_storage + fq-libs-db_id_async_resolver_impl fq-libs-db_schema fq-libs-private_client core-testlib-default diff --git a/ydb/services/fq/ut_integration/CMakeLists.linux-aarch64.txt b/ydb/services/fq/ut_integration/CMakeLists.linux-aarch64.txt index 971833aa8c9..365ca27a222 100644 --- a/ydb/services/fq/ut_integration/CMakeLists.linux-aarch64.txt +++ b/ydb/services/fq/ut_integration/CMakeLists.linux-aarch64.txt @@ -25,6 +25,7 @@ target_link_libraries(ydb-services-fq-ut_integration PUBLIC cpp-regex-pcre library-cpp-svnversion fq-libs-control_plane_storage + fq-libs-db_id_async_resolver_impl fq-libs-db_schema fq-libs-private_client core-testlib-default diff --git a/ydb/services/fq/ut_integration/CMakeLists.linux-x86_64.txt b/ydb/services/fq/ut_integration/CMakeLists.linux-x86_64.txt index 2cb80bd0a8f..42f746e5097 100644 --- a/ydb/services/fq/ut_integration/CMakeLists.linux-x86_64.txt +++ b/ydb/services/fq/ut_integration/CMakeLists.linux-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(ydb-services-fq-ut_integration PUBLIC cpp-regex-pcre library-cpp-svnversion fq-libs-control_plane_storage + fq-libs-db_id_async_resolver_impl fq-libs-db_schema fq-libs-private_client core-testlib-default diff --git a/ydb/services/fq/ut_integration/CMakeLists.windows-x86_64.txt b/ydb/services/fq/ut_integration/CMakeLists.windows-x86_64.txt index 354170ff722..ed7bc57c8dd 100644 --- a/ydb/services/fq/ut_integration/CMakeLists.windows-x86_64.txt +++ b/ydb/services/fq/ut_integration/CMakeLists.windows-x86_64.txt @@ -25,6 +25,7 @@ target_link_libraries(ydb-services-fq-ut_integration PUBLIC cpp-regex-pcre library-cpp-svnversion fq-libs-control_plane_storage + fq-libs-db_id_async_resolver_impl fq-libs-db_schema fq-libs-private_client core-testlib-default diff --git a/ydb/services/fq/ut_integration/fq_ut.cpp b/ydb/services/fq/ut_integration/fq_ut.cpp index a2ada19c336..a13fd6f13b5 100644 --- a/ydb/services/fq/ut_integration/fq_ut.cpp +++ b/ydb/services/fq/ut_integration/fq_ut.cpp @@ -9,6 +9,7 @@ #include <ydb/core/fq/libs/control_plane_storage/message_builders.h> #include <ydb/core/fq/libs/actors/database_resolver.h> +#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> @@ -858,9 +859,21 @@ Y_UNIT_TEST_SUITE(Yq_1) { Y_UNIT_TEST_SUITE(Yq_2) { // use fork for data test due to ch initialization problem Y_UNIT_TEST(Test_HostNameTrasformation) { - UNIT_ASSERT_VALUES_EQUAL(::NFq::TransformMdbHostToCorrectFormat("rc1c-p5waby2y5y1kb5ue.mdb.yandexcloud.net"), "rc1c-p5waby2y5y1kb5ue.db.yandex.net:8443"); - UNIT_ASSERT_VALUES_EQUAL(::NFq::TransformMdbHostToCorrectFormat("xxx.xxx"), "xxx.db.yandex.net:8443"); - UNIT_ASSERT_VALUES_EQUAL(::NFq::TransformMdbHostToCorrectFormat("host."), "host.db.yandex.net:8443"); + { + auto transformer = ::NFq::MakeTMdbHostTransformerLegacy(); + UNIT_ASSERT_VALUES_EQUAL(transformer->ToEndpoint(NYql::EDatabaseType::ClickHouse, "rc1c-p5waby2y5y1kb5ue.db.yandex.net"), + "rc1c-p5waby2y5y1kb5ue.db.yandex.net:8443"); + UNIT_ASSERT_VALUES_EQUAL(transformer->ToEndpoint(NYql::EDatabaseType::ClickHouse, "ya.ru"), + "ya.db.yandex.net:8443"); + } + + { + auto transformer = ::NFq::MakeTMdbHostTransformerGeneric(); + UNIT_ASSERT_VALUES_EQUAL(::NFq::MakeTMdbHostTransformerGeneric()->ToEndpoint(NYql::EDatabaseType::ClickHouse, "rc1a-d6dv17lv47v5mcop.mdb.yandexcloud.net"), + "rc1a-d6dv17lv47v5mcop.mdb.yandexcloud.net:9000"); + UNIT_ASSERT_VALUES_EQUAL(::NFq::MakeTMdbHostTransformerGeneric()->ToEndpoint(NYql::EDatabaseType::PostgreSQL, "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net"), + "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net:6432"); + } } SIMPLE_UNIT_FORKED_TEST(ReadFromYdbOverYq) { diff --git a/ydb/services/fq/ut_integration/ya.make b/ydb/services/fq/ut_integration/ya.make index 9f3f25f0a2a..2548e272a92 100644 --- a/ydb/services/fq/ut_integration/ya.make +++ b/ydb/services/fq/ut_integration/ya.make @@ -15,6 +15,7 @@ PEERDIR( library/cpp/regex/pcre library/cpp/svnversion ydb/core/fq/libs/control_plane_storage + ydb/core/fq/libs/db_id_async_resolver_impl ydb/core/fq/libs/db_schema ydb/core/fq/libs/private_client ydb/core/testlib/default |