summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvitalyisaev <[email protected]>2023-07-11 15:37:20 +0300
committervitalyisaev <[email protected]>2023-07-11 15:37:20 +0300
commit208ce864f80a4e05a6dc4e1fe1145075a7e654de (patch)
tree68e86a239dce25bc804ccf64476e0438cf577372
parent765740530ffb3fc45a4621fea927d01f233811ac (diff)
YQ Connector: support managed PostgreSQL in dqrun
YQ Connector: support managed PostgreSQL
-rw-r--r--ydb/core/fq/libs/actors/database_resolver.cpp100
-rw-r--r--ydb/core/fq/libs/actors/database_resolver.h2
-rw-r--r--ydb/core/fq/libs/actors/run_actor.cpp11
-rw-r--r--ydb/core/fq/libs/config/protos/common.proto2
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp10
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h8
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp55
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.h10
-rw-r--r--ydb/core/fq/libs/db_id_async_resolver_impl/ya.make1
-rw-r--r--ydb/core/fq/libs/events/events.h15
-rw-r--r--ydb/core/fq/libs/test_connection/test_connection.cpp4
-rw-r--r--ydb/core/fq/libs/test_connection/test_data_streams.cpp12
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp14
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h2
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h4
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h38
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/mdb_host_transformer.h17
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/ya.make2
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp2
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp17
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_settings.h2
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp12
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp6
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_helpers.h4
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp12
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_provider.h2
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp4
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_settings.h2
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp14
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h2
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp4
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h2
-rw-r--r--ydb/services/fq/ut_integration/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/services/fq/ut_integration/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/fq/ut_integration/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/services/fq/ut_integration/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/services/fq/ut_integration/fq_ut.cpp19
-rw-r--r--ydb/services/fq/ut_integration/ya.make1
46 files changed, 287 insertions, 137 deletions
diff --git a/ydb/core/fq/libs/actors/database_resolver.cpp b/ydb/core/fq/libs/actors/database_resolver.cpp
index 41341eea4b7..ee5aaccd8cd 100644
--- a/ydb/core/fq/libs/actors/database_resolver.cpp
+++ b/ydb/core/fq/libs/actors/database_resolver.cpp
@@ -18,14 +18,12 @@ namespace NFq {
using namespace NActors;
using namespace NYql;
-using TEndpoint = NYql::TDbResolverResponse::TEndpoint;
+using TEndpoint = NYql::TDatabaseResolverResponse::TEndpoint;
-using TParsers = THashMap<NYql::DatabaseType, std::function<TEndpoint(NJson::TJsonValue& body, bool)>>;
-using TCache = TTtlCache<std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>, std::variant<TEndpoint, TString>>;
+using TParser = std::function<TEndpoint(NJson::TJsonValue& body, const NYql::IMdbHostTransformer::TPtr& mdbHostTransformer)>;
+using TParsers = THashMap<NYql::EDatabaseType, TParser>;
+using TCache = TTtlCache<std::tuple<TString, NYql::EDatabaseType, NYql::TDatabaseAuth>, std::variant<TEndpoint, TString>>;
-TString TransformMdbHostToCorrectFormat(const TString& mdbHost) {
- return mdbHost.substr(0, mdbHost.find('.')) + ".db.yandex.net:8443";
-}
class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
{
@@ -37,16 +35,16 @@ public:
TResponseProcessor(
const TActorId sender,
TCache& cache,
- const TDbResolverResponse::TDatabaseEndpointsMap& ready,
- const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>>& requests,
+ const TDatabaseResolverResponse::TDatabaseEndpointsMap& ready,
+ const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::EDatabaseType, NYql::TDatabaseAuth>>& requests,
const TString& traceId,
- bool mdbTransformHost,
+ const NYql::IMdbHostTransformer::TPtr& mdbHostTransformer,
const TParsers& parsers)
: Sender(sender)
, Cache(cache)
, Requests(requests)
, TraceId(traceId)
- , MdbTransformHost(mdbTransformHost)
+ , MdbHostTransformer(mdbHostTransformer)
, DatabaseId2Endpoint(ready)
, Parsers(parsers)
{ }
@@ -102,7 +100,7 @@ private:
Send(Sender,
new TEvents::TEvEndpointResponse(
- NYql::TDbResolverResponse(std::move(DatabaseId2Endpoint), Success, issues)));
+ NYql::TDatabaseResolverResponse(std::move(DatabaseId2Endpoint), Success, issues)));
PassAway();
}
@@ -111,7 +109,7 @@ private:
TString status;
TString errorMessage;
TString databaseId;
- NYql::DatabaseType databaseType = DatabaseType::Ydb;
+ NYql::EDatabaseType databaseType = EDatabaseType::Ydb;
NYql::TDatabaseAuth info;
TMaybe<TEndpoint> result;
HandledIds++;
@@ -129,9 +127,9 @@ private:
TParsers::const_iterator parserIt;
if (parseJsonOk && (parserIt = Parsers.find(databaseType)) != Parsers.end()) {
try {
- auto res = parserIt->second(databaseInfo, MdbTransformHost);
+ auto res = parserIt->second(databaseInfo, MdbHostTransformer);
LOG_D("Got db_id: " << databaseId
- << ", db type: " << static_cast<std::underlying_type<NYql::DatabaseType>::type>(databaseType)
+ << ", db type: " << static_cast<std::underlying_type<NYql::EDatabaseType>::type>(databaseType)
<< ", endpoint: " << res.Endpoint
<< ", database: " << res.Database);
DatabaseId2Endpoint[std::make_pair(databaseId, databaseType)] = res;
@@ -140,13 +138,13 @@ private:
errorMessage = TStringBuilder()
<< " Couldn't resolve "
<< "databaseId: " << databaseId
- << ", db type: " << static_cast<std::underlying_type<NYql::DatabaseType>::type>(databaseType) << "\n"
+ << ", db type: " << static_cast<std::underlying_type<NYql::EDatabaseType>::type>(databaseType) << "\n"
<< CurrentExceptionMessage();
}
} else {
errorMessage = TStringBuilder() << "Unable to parse database information."
<< "Database Id: " << databaseId
- << ", db type: " << static_cast<std::underlying_type<NYql::DatabaseType>::type>(databaseType);
+ << ", db type: " << static_cast<std::underlying_type<NYql::EDatabaseType>::type>(databaseType);
}
}
} else {
@@ -184,10 +182,10 @@ private:
private:
const TActorId Sender;
TCache& Cache;
- const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>> Requests;
+ const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::EDatabaseType, NYql::TDatabaseAuth>> Requests;
const TString TraceId;
- const bool MdbTransformHost;
- TDbResolverResponse::TDatabaseEndpointsMap DatabaseId2Endpoint;
+ const NYql::IMdbHostTransformer::TPtr MdbHostTransformer;
+ TDatabaseResolverResponse::TDatabaseEndpointsMap DatabaseId2Endpoint;
size_t HandledIds = 0;
bool Success = true;
const TParsers& Parsers;
@@ -207,7 +205,7 @@ public:
.SetErrorTtl(TDuration::Minutes(1))
.SetMaxSize(1000000))
{
- auto ydbParser = [](NJson::TJsonValue& databaseInfo, bool) {
+ auto ydbParser = [](NJson::TJsonValue& databaseInfo, const NYql::IMdbHostTransformer::TPtr&) {
bool secure = false;
TString endpoint = databaseInfo.GetMap().at("endpoint").GetStringRobust();
TString prefix("/?database=");
@@ -228,10 +226,10 @@ public:
Y_ENSURE(endpoint);
return TEndpoint{endpoint, database, secure};
};
- Parsers[NYql::DatabaseType::Ydb] = ydbParser;
- Parsers[NYql::DatabaseType::DataStreams] = [ydbParser](NJson::TJsonValue& databaseInfo, bool mdbTransformHost)
+ Parsers[NYql::EDatabaseType::Ydb] = ydbParser;
+ Parsers[NYql::EDatabaseType::DataStreams] = [ydbParser](NJson::TJsonValue& databaseInfo, const NYql::IMdbHostTransformer::TPtr& mdbHostTransformer)
{
- auto ret = ydbParser(databaseInfo, mdbTransformHost);
+ auto ret = ydbParser(databaseInfo, mdbHostTransformer);
// TODO: Take explicit field from MVP
if (ret.Endpoint.StartsWith("ydb.")) {
// Replace "ydb." -> "yds."
@@ -239,7 +237,7 @@ public:
}
return ret;
};
- Parsers[NYql::DatabaseType::ClickHouse] = [](NJson::TJsonValue& databaseInfo, bool mdbTransformHost) {
+ Parsers[NYql::EDatabaseType::ClickHouse] = [](NJson::TJsonValue& databaseInfo, const NYql::IMdbHostTransformer::TPtr& mdbHostTransformer) {
TString endpoint;
TVector<TString> aliveHosts;
for (const auto& host : databaseInfo.GetMap().at("hosts").GetArraySafe()) {
@@ -248,37 +246,44 @@ public:
}
}
if (!aliveHosts.empty()) {
- endpoint = aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())];
+ endpoint = mdbHostTransformer->ToEndpoint(
+ NYql::EDatabaseType::ClickHouse,
+ aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())]
+ );
}
if (!endpoint) {
- ythrow yexception() << "No ALIVE ClickHouse hosts exist";
+ ythrow yexception() << "No ALIVE ClickHouse hosts found";
}
- endpoint = mdbTransformHost ? TransformMdbHostToCorrectFormat(endpoint) : endpoint;
return TEndpoint{endpoint, "", true};
};
- // TODO: https://st.yandex-team.ru/YQ-2171: support other data sources than ClickHouse
- Parsers[NYql::DatabaseType::Generic] = [](NJson::TJsonValue& databaseInfo, bool mdbTransformHost) {
+ Parsers[NYql::EDatabaseType::PostgreSQL] = [](NJson::TJsonValue& databaseInfo, const NYql::IMdbHostTransformer::TPtr& mdbHostTransformer) {
TString endpoint;
TVector<TString> aliveHosts;
-
for (const auto& host : databaseInfo.GetMap().at("hosts").GetArraySafe()) {
- if (host["health"].GetString() == "ALIVE" && host["type"].GetString() == "CLICKHOUSE") {
+
+ // all host services must be alive
+ bool alive = true;
+ for (const auto& service: host.GetMap().at("services").GetArraySafe()) {
+ if (service["health"].GetString() != "ALIVE") {
+ alive = false;
+ break;
+ }
+ }
+
+ if (alive) {
aliveHosts.push_back(host["name"].GetString());
}
}
-
if (!aliveHosts.empty()) {
- endpoint = aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())];
+ endpoint = mdbHostTransformer->ToEndpoint(
+ NYql::EDatabaseType::PostgreSQL,
+ aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())]
+ );
}
if (!endpoint) {
- ythrow yexception() << "No ALIVE database host exists";
- }
-
- if (mdbTransformHost) {
- // TODO: https://st.yandex-team.ru/YQ-2170: support secure connections on 9440
- endpoint += ":9000";
+ ythrow yexception() << "No ALIVE PostgreSQL hosts found";
}
return TEndpoint{endpoint, "", true};
@@ -295,7 +300,7 @@ private:
void SendResponse(
const NActors::TActorId& recipient,
- TDbResolverResponse::TDatabaseEndpointsMap&& ready,
+ TDatabaseResolverResponse::TDatabaseEndpointsMap&& ready,
bool success = true,
const TString& errorMessage = "")
{
@@ -304,15 +309,15 @@ private:
if (errorMessage)
issues.AddIssue(errorMessage);
Send(recipient,
- new TEvents::TEvEndpointResponse(NYql::TDbResolverResponse{std::move(ready), success, issues}));
+ new TEvents::TEvEndpointResponse(NYql::TDatabaseResolverResponse{std::move(ready), success, issues}));
}
void Handle(TEvents::TEvEndpointRequest::TPtr ev)
{
TraceId = ev->Get()->TraceId;
LOG_D("Start databaseId resolver for " << ev->Get()->DatabaseIds.size() << " ids");
- THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>> requests; // request, (dbId, type, info)
- TDbResolverResponse::TDatabaseEndpointsMap ready;
+ THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::EDatabaseType, NYql::TDatabaseAuth>> requests; // request, (dbId, type, info)
+ TDatabaseResolverResponse::TDatabaseEndpointsMap ready;
for (const auto& [p, info] : ev->Get()->DatabaseIds) {
const auto& [databaseId, type] = p;
TMaybe<std::variant<TEndpoint, TString>> cacheVal;
@@ -336,13 +341,14 @@ private:
try {
TString url;
- if (IsIn({NYql::DatabaseType::Ydb, NYql::DatabaseType::DataStreams }, type)) {
+ if (IsIn({NYql::EDatabaseType::Ydb, NYql::EDatabaseType::DataStreams }, type)) {
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
.AddUrlParam("databaseId", databaseId)
.Build();
- } else {
+ } else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL }, type)) {
YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway");
- url = TUrlBuilder(ev->Get()->MdbGateway + "/managed-clickhouse/v1/clusters/")
+ url = TUrlBuilder(
+ ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeToString(type) + "/v1/clusters/")
.AddPathComponent(databaseId)
.AddPathComponent("hosts")
.Build();
@@ -369,7 +375,7 @@ private:
if (!requests.empty()) {
auto helper = Register(
- new TResponseProcessor(ev->Sender, Cache, ready, requests, TraceId, ev->Get()->MdbTransformHost, Parsers));
+ new TResponseProcessor(ev->Sender, Cache, ready, requests, TraceId, ev->Get()->MdbHostTransformer, Parsers));
for (const auto& [request, _] : requests) {
TActivationContext::Send(new IEventHandle(HttpProxy, helper, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(request)));
diff --git a/ydb/core/fq/libs/actors/database_resolver.h b/ydb/core/fq/libs/actors/database_resolver.h
index cdabf7194ac..21035416820 100644
--- a/ydb/core/fq/libs/actors/database_resolver.h
+++ b/ydb/core/fq/libs/actors/database_resolver.h
@@ -6,8 +6,6 @@
namespace NFq {
-TString TransformMdbHostToCorrectFormat(const TString& mdbHost);
-
NActors::IActor* CreateDatabaseResolver(NActors::TActorId httpProxy, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
} /* namespace NFq */
diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp
index 3e6d6e29286..1399465abd0 100644
--- a/ydb/core/fq/libs/actors/run_actor.cpp
+++ b/ydb/core/fq/libs/actors/run_actor.cpp
@@ -59,6 +59,7 @@
#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
#include <ydb/core/fq/libs/control_plane_storage/util.h>
#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
+#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.h>
#include <ydb/core/fq/libs/gateway/empty_gateway.h>
#include <ydb/core/fq/libs/private_client/events.h>
#include <ydb/core/fq/libs/private_client/private_client.h>
@@ -1824,8 +1825,14 @@ private:
clusters);
TVector<TDataProviderInitializer> dataProvidersInit;
- const std::shared_ptr<IDatabaseAsyncResolver> dbResolver = std::make_shared<TDatabaseAsyncResolverImpl>(NActors::TActivationContext::ActorSystem(), Params.DatabaseResolver,
- Params.Config.GetCommon().GetYdbMvpCloudEndpoint(), Params.Config.GetCommon().GetMdbGateway(), Params.Config.GetCommon().GetMdbTransformHost(), Params.QueryId);
+ const std::shared_ptr<IDatabaseAsyncResolver> dbResolver = std::make_shared<TDatabaseAsyncResolverImpl>(
+ NActors::TActivationContext::ActorSystem(),
+ Params.DatabaseResolver,
+ Params.Config.GetCommon().GetYdbMvpCloudEndpoint(),
+ Params.Config.GetCommon().GetMdbGateway(),
+ NFq::MakeTMdbHostTransformerGeneric(),
+ // Params.Config.GetCommon().GetMdbTransformHost(),
+ Params.QueryId);
{
// TBD: move init to better place
QueryStateUpdateRequest.set_scope(Params.Scope.ToString());
diff --git a/ydb/core/fq/libs/config/protos/common.proto b/ydb/core/fq/libs/config/protos/common.proto
index 7bb6da3577b..ade8d5562c7 100644
--- a/ydb/core/fq/libs/config/protos/common.proto
+++ b/ydb/core/fq/libs/config/protos/common.proto
@@ -17,7 +17,7 @@ message TCommonConfig {
string YdbMvpCloudEndpoint = 2;
string MdbGateway = 3;
bool UseBearerForYdb = 4;
- bool MdbTransformHost = 5;
+ bool MdbTransformHost = 5 [deprecated = true]; // TODO: remove it in https://st.yandex-team.ru/YQ-2229
string ObjectStorageEndpoint = 6;
string IdsPrefix = 7;
uint64 MaxTasksPerOperation = 8;
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.darwin-x86_64.txt
index ec08e5f4a83..3c27829d986 100644
--- a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.darwin-x86_64.txt
@@ -21,4 +21,5 @@ target_link_libraries(fq-libs-db_id_async_resolver_impl PUBLIC
)
target_sources(fq-libs-db_id_async_resolver_impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp
)
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-aarch64.txt
index 1b9b50a5624..a26a3f5d590 100644
--- a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-aarch64.txt
@@ -22,4 +22,5 @@ target_link_libraries(fq-libs-db_id_async_resolver_impl PUBLIC
)
target_sources(fq-libs-db_id_async_resolver_impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp
)
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-x86_64.txt
index 1b9b50a5624..a26a3f5d590 100644
--- a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.linux-x86_64.txt
@@ -22,4 +22,5 @@ target_link_libraries(fq-libs-db_id_async_resolver_impl PUBLIC
)
target_sources(fq-libs-db_id_async_resolver_impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp
)
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.windows-x86_64.txt
index ec08e5f4a83..3c27829d986 100644
--- a/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/CMakeLists.windows-x86_64.txt
@@ -21,4 +21,5 @@ target_link_libraries(fq-libs-db_id_async_resolver_impl PUBLIC
)
target_sources(fq-libs-db_id_async_resolver_impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp
)
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
index 2fcacfe931b..4621a2c4be0 100644
--- a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
@@ -10,18 +10,18 @@ TDatabaseAsyncResolverImpl::TDatabaseAsyncResolverImpl(
const NActors::TActorId& recipient,
const TString& ydbMvpEndpoint,
const TString& mdbGateway,
- bool mdbTransformHost,
+ NYql::IMdbHostTransformer::TPtr&& mdbHostTransformer,
const TString& traceId)
: ActorSystem(actorSystem)
, Recipient(recipient)
, YdbMvpEndpoint(ydbMvpEndpoint)
, MdbGateway(mdbGateway)
- , MdbTransformHost(mdbTransformHost)
+ , MdbHostTransformer(std::move(mdbHostTransformer))
, TraceId(traceId)
{
}
-TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(const TDatabaseAuthMap& ids) const
+TFuture<NYql::TDatabaseResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(const TDatabaseAuthMap& ids) const
{
// Cloud database ids validataion
for (const auto& kv: ids) {
@@ -29,7 +29,7 @@ TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(const
YQL_ENSURE(kv.first.first, "empty cluster name");
}
- auto promise = NewPromise<NYql::TDbResolverResponse>();
+ auto promise = NewPromise<NYql::TDatabaseResolverResponse>();
TDuration timeout = TDuration::Seconds(40);
auto callback = MakeHolder<NYql::TRichActorFutureCallback<TEvents::TEvEndpointResponse>>(
[promise] (TAutoPtr<NActors::TEventHandle<TEvents::TEvEndpointResponse>>& event) mutable {
@@ -45,7 +45,7 @@ TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(const
ActorSystem->Send(new NActors::IEventHandle(Recipient, callbackId,
new TEvents::TEvEndpointRequest(ids, YdbMvpEndpoint, MdbGateway,
- TraceId, MdbTransformHost)));
+ TraceId, MdbHostTransformer)));
return promise.GetFuture();
}
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h
index dbb51e48b27..8feb0ecdcc8 100644
--- a/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h
@@ -1,6 +1,8 @@
#pragma once
+
#include <ydb/core/fq/libs/events/events.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_host_transformer.h>
#include <ydb/library/yql/providers/dq/actors/actor_helpers.h>
namespace NFq {
@@ -12,17 +14,17 @@ public:
const NActors::TActorId& recipient,
const TString& ydbMvpEndpoint,
const TString& mdbGateway,
- bool mdbTransformHost = false,
+ NYql::IMdbHostTransformer::TPtr&& mdbHostTransformer,
const TString& traceId = ""
);
- NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds(const TDatabaseAuthMap& ids) const override;
+ NThreading::TFuture<NYql::TDatabaseResolverResponse> ResolveIds(const TDatabaseAuthMap& ids) const override;
private:
NActors::TActorSystem* ActorSystem;
const NActors::TActorId Recipient;
const TString YdbMvpEndpoint;
const TString MdbGateway;
- const bool MdbTransformHost = false;
+ NYql::IMdbHostTransformer::TPtr MdbHostTransformer;
const TString TraceId;
};
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp
new file mode 100644
index 00000000000..8772a7bf131
--- /dev/null
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.cpp
@@ -0,0 +1,55 @@
+#include "mdb_host_transformer.h"
+
+namespace NFq {
+ // TMdbHostTransformerLegacy implements behavior required by YQL legacy ClickHouse provider
+ class TMdbHostTransformerLegacy: public NYql::IMdbHostTransformer {
+ TString ToEndpoint(const NYql::EDatabaseType databaseType, const TString& mdbHost) const override {
+ // Inherited from here
+ // https://a.yandex-team.ru/arcadia/ydb/core/fq/libs/actors/database_resolver.cpp?rev=r11819335#L27
+ if (databaseType == NYql::EDatabaseType::ClickHouse) {
+ return mdbHost.substr(0, mdbHost.find('.')) + ".db.yandex.net:8443";
+ }
+
+ ythrow yexception() << TStringBuilder() << "Unexpected database type: " << int(databaseType);
+ }
+ };
+
+ NYql::IMdbHostTransformer::TPtr
+ MakeTMdbHostTransformerLegacy() {
+ return std::make_shared<TMdbHostTransformerLegacy>();
+ }
+
+ // TMdbHostTransformerGeneric implements behavior required by YQL Generic provider
+ // that interacts with data sources through a separate Connector service
+ class TMdbHostTransformerGeneric: public NYql::IMdbHostTransformer {
+ TString ToEndpoint(const NYql::EDatabaseType databaseType, const TString& mdbHost) const override {
+ switch (databaseType) {
+ case NYql::EDatabaseType::ClickHouse:
+ // TODO: https://st.yandex-team.ru/YQ-2170: support secure connections on 9440
+ return mdbHost + ":9000";
+ case NYql::EDatabaseType::PostgreSQL:
+ // https://cloud.yandex.ru/docs/managed-postgresql/operations/connect
+ return mdbHost + ":6432";
+ default:
+ ythrow yexception() << TStringBuilder() << "Unexpected database type: " << int(databaseType);
+ };
+ }
+ };
+
+ NYql::IMdbHostTransformer::TPtr
+ MakeTMdbHostTransformerGeneric() {
+ return std::make_shared<TMdbHostTransformerGeneric>();
+ }
+
+ // TMdbHostTransformerNoop just does nothing
+ class TMdbHostTransformerNoop: public NYql::IMdbHostTransformer {
+ TString ToEndpoint(const NYql::EDatabaseType, const TString& mdbHost) const override {
+ return mdbHost;
+ }
+ };
+
+ NYql::IMdbHostTransformer::TPtr
+ MakeTMdbHostTransformerNoop() {
+ return std::make_shared<TMdbHostTransformerNoop>();
+ }
+} \ No newline at end of file
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.h b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.h
new file mode 100644
index 00000000000..4a26bc6b654
--- /dev/null
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_host_transformer.h>
+
+namespace NFq {
+ NYql::IMdbHostTransformer::TPtr MakeTMdbHostTransformerLegacy();
+ NYql::IMdbHostTransformer::TPtr MakeTMdbHostTransformerGeneric();
+ NYql::IMdbHostTransformer::TPtr MakeTMdbHostTransformerNoop();
+}
diff --git a/ydb/core/fq/libs/db_id_async_resolver_impl/ya.make b/ydb/core/fq/libs/db_id_async_resolver_impl/ya.make
index 364c1ecd51f..2ac916e03c6 100644
--- a/ydb/core/fq/libs/db_id_async_resolver_impl/ya.make
+++ b/ydb/core/fq/libs/db_id_async_resolver_impl/ya.make
@@ -2,6 +2,7 @@ LIBRARY()
SRCS(
db_async_resolver_impl.cpp
+ mdb_host_transformer.cpp
)
PEERDIR(
diff --git a/ydb/core/fq/libs/events/events.h b/ydb/core/fq/libs/events/events.h
index 3d5e4e6e2c4..62bc65299a6 100644
--- a/ydb/core/fq/libs/events/events.h
+++ b/ydb/core/fq/libs/events/events.h
@@ -3,6 +3,7 @@
#include <ydb/library/yql/core/facade/yql_facade.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_host_transformer.h>
#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -105,29 +106,29 @@ struct TEvents {
};
struct TEvEndpointResponse : NActors::TEventLocal<TEvEndpointResponse, TEventIds::EvEndpointResponse> {
- NYql::TDbResolverResponse DbResolverResponse;
- explicit TEvEndpointResponse(NYql::TDbResolverResponse&& response) noexcept : DbResolverResponse(std::move(response)) {}
+ NYql::TDatabaseResolverResponse DbResolverResponse;
+ explicit TEvEndpointResponse(NYql::TDatabaseResolverResponse&& response) noexcept : DbResolverResponse(std::move(response)) {}
};
struct TEvEndpointRequest : NActors::TEventLocal<TEvEndpointRequest, TEventIds::EvEndpointRequest> {
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds; // DbId, DatabaseType => database auth
+ const NYql::IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseIds;
TString YdbMvpEndpoint;
TString MdbGateway;
TString TraceId;
- bool MdbTransformHost;
+ const NYql::IMdbHostTransformer::TPtr MdbHostTransformer;
TEvEndpointRequest(
- const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds,
+ const NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseIds,
const TString& ydbMvpEndpoint,
const TString& mdbGateway,
const TString& traceId,
- bool mdbTransformHost)
+ const NYql::IMdbHostTransformer::TPtr& mdbHostTransformer)
: DatabaseIds(databaseIds)
, YdbMvpEndpoint(ydbMvpEndpoint)
, MdbGateway(mdbGateway)
, TraceId(traceId)
- , MdbTransformHost(mdbTransformHost)
+ , MdbHostTransformer(mdbHostTransformer)
{ }
};
diff --git a/ydb/core/fq/libs/test_connection/test_connection.cpp b/ydb/core/fq/libs/test_connection/test_connection.cpp
index 1f4a3bc90fa..ec1e760769d 100644
--- a/ydb/core/fq/libs/test_connection/test_connection.cpp
+++ b/ydb/core/fq/libs/test_connection/test_connection.cpp
@@ -9,6 +9,7 @@
#include <ydb/core/fq/libs/common/util.h>
#include <ydb/core/fq/libs/config/yq_issue.h>
#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
+#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.h>
#include <ydb/core/fq/libs/control_plane_storage/config.h>
#include <ydb/library/security/util.h>
@@ -146,7 +147,8 @@ public:
DbResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>(
NActors::TActivationContext::ActorSystem(), DatabaseResolverActor,
CommonConfig.GetYdbMvpCloudEndpoint(), CommonConfig.GetMdbGateway(),
- CommonConfig.GetMdbTransformHost());
+ NFq::MakeTMdbHostTransformerGeneric()
+ );
Become(&TTestConnectionActor::StateFunc);
}
diff --git a/ydb/core/fq/libs/test_connection/test_data_streams.cpp b/ydb/core/fq/libs/test_connection/test_data_streams.cpp
index 363b5a7a660..8e9fc1f02c5 100644
--- a/ydb/core/fq/libs/test_connection/test_data_streams.cpp
+++ b/ydb/core/fq/libs/test_connection/test_data_streams.cpp
@@ -25,9 +25,9 @@ struct TEvPrivate {
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
struct TEvResolveDbResponse : NActors::TEventLocal<TEvResolveDbResponse, EvResolveDbResponse> {
- NYql::TDbResolverResponse Result;
+ NYql::TDatabaseResolverResponse Result;
- TEvResolveDbResponse(const NYql::TDbResolverResponse& result)
+ TEvResolveDbResponse(const NYql::TDatabaseResolverResponse& result)
: Result(result)
{}
};
@@ -145,14 +145,14 @@ private:
SendOpenSession();
return;
}
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids;
- ids[std::pair{ClusterConfig.GetDatabaseId(), NYql::DatabaseType::DataStreams}] = {StructuredToken, CommonConfig.GetUseBearerForYdb()};
+ NYql::IDatabaseAsyncResolver::TDatabaseAuthMap ids;
+ ids[std::pair{ClusterConfig.GetDatabaseId(), NYql::EDatabaseType::DataStreams}] = {StructuredToken, CommonConfig.GetUseBearerForYdb()};
DbResolver->ResolveIds(ids).Apply([self=SelfId(), as=TActivationContext::ActorSystem()](const auto& future) {
try {
auto result = future.GetValue();
as->Send(new IEventHandle(self, self, new TEvPrivate::TEvResolveDbResponse(result), 0));
} catch (...) {
- as->Send(new IEventHandle(self, self, new TEvPrivate::TEvResolveDbResponse(NYql::TDbResolverResponse{{}, false, NYql::TIssues{MakeErrorIssue(NFq::TIssuesIds::BAD_REQUEST, CurrentExceptionMessage())}}), 0));
+ as->Send(new IEventHandle(self, self, new TEvPrivate::TEvResolveDbResponse(NYql::TDatabaseResolverResponse{{}, false, NYql::TIssues{MakeErrorIssue(NFq::TIssuesIds::BAD_REQUEST, CurrentExceptionMessage())}}), 0));
}
});
}
@@ -165,7 +165,7 @@ private:
return;
}
- auto it = response.DatabaseId2Endpoint.find(std::pair{ClusterConfig.GetDatabaseId(), NYql::DatabaseType::DataStreams});
+ auto it = response.DatabaseId2Endpoint.find(std::pair{ClusterConfig.GetDatabaseId(), NYql::EDatabaseType::DataStreams});
if (it == response.DatabaseId2Endpoint.end()) {
TC_LOG_E(Scope << " " << User << " " << NKikimr::MaskTicket(Token) << " Test data streams connection: database is not found for database_id " << ClusterConfig.GetDatabaseId());
ReplyError(TStringBuilder{} << "Test data streams connection: database is not found for database_id " << ClusterConfig.GetDatabaseId());
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp
index 8b7bc177d66..5b05314fd7b 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp
@@ -15,7 +15,7 @@ using namespace NNodes;
class TClickHouseIODiscoveryTransformer : public TGraphTransformerBase {
-using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>;
+using TDbId2Endpoint = THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseResolverResponse::TEndpoint>;
public:
TClickHouseIODiscoveryTransformer(TClickHouseState::TPtr state)
@@ -31,7 +31,7 @@ public:
if (!State_->DbResolver)
return TStatus::Ok;
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids;
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> ids;
if (auto reads = FindNodes(input, [&](const TExprNode::TPtr& node) {
const TExprBase nodeExpr(node);
if (!nodeExpr.Maybe<TClRead>())
@@ -50,7 +50,7 @@ public:
auto dbId = State_->Configuration->Endpoints[cluster].first;
dbId = dbId.substr(0, dbId.find(':'));
YQL_CLOG(DEBUG, ProviderClickHouse) << "Found dbId: " << dbId;
- const auto idKey = std::make_pair(dbId, NYql::DatabaseType::ClickHouse);
+ const auto idKey = std::make_pair(dbId, NYql::EDatabaseType::ClickHouse);
const auto iter = State_->DatabaseIds.find(idKey);
if (iter != State_->DatabaseIds.end()) {
YQL_CLOG(DEBUG, ProviderClickHouse) << "Resolve CH id: " << dbId;
@@ -64,7 +64,7 @@ public:
return TStatus::Ok;
}
- const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_;
+ const std::weak_ptr<NYql::TDatabaseResolverResponse> response = DbResolverResponse_;
AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) {
if (const auto res = response.lock())
*res = std::move(future.ExtractValue());
@@ -84,7 +84,7 @@ public:
return TStatus::Error;
}
FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end());
- DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
YQL_CLOG(DEBUG, ProviderClickHouse) << "ResolvedIds: " << FullResolvedIds_.size();
auto& endpoints = State_->Configuration->Endpoints;
const auto& id2Clusters = State_->Configuration->DbId2Clusters;
@@ -113,14 +113,14 @@ public:
void Rewind() final {
AsyncFuture_ = {};
FullResolvedIds_.clear();
- DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
}
private:
const TClickHouseState::TPtr State_;
NThreading::TFuture<void> AsyncFuture_;
TDbId2Endpoint FullResolvedIds_;
- std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ std::shared_ptr<NYql::TDatabaseResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
};
}
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h
index 849ed534ed8..650f4a8df75 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h
@@ -27,7 +27,7 @@ struct TClickHouseState : public TThrRefBase
TTypeAnnotationContext* Types = nullptr;
TClickHouseConfiguration::TPtr Configuration = MakeIntrusive<TClickHouseConfiguration>();
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds;
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> DatabaseIds;
std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver;
};
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h
index bcf1acd4f2d..fc1acfff7f4 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h
@@ -23,7 +23,7 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS
void Init(
const TProtoConfig& config,
const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds)
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& databaseIds)
{
TVector<TString> clusters(Reserve(config.ClusterMappingSize()));
for (auto& cluster: config.GetClusterMapping()) {
@@ -39,7 +39,7 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS
if (dbResolver) {
YQL_CLOG(DEBUG, ProviderClickHouse) << "Settings: clusterName = " << cluster.GetName()
<< ", clusterDbId = " << cluster.GetId() << ", cluster.GetCluster(): " << cluster.GetCluster() << ", HasCluster: " << (cluster.HasCluster() ? "TRUE" : "FALSE") ;
- databaseIds[std::make_pair(cluster.GetId(), NYql::DatabaseType::ClickHouse)] =
+ databaseIds[std::make_pair(cluster.GetId(), NYql::EDatabaseType::ClickHouse)] =
NYql::TDatabaseAuth{cluster.GetCHToken(), /*AddBearer=*/false};
if (cluster.GetId()) {
DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName());
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.darwin-x86_64.txt
index 7bbfc9a4c1a..df50b83d349 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.darwin-x86_64.txt
@@ -12,5 +12,6 @@ target_link_libraries(providers-common-db_id_async_resolver INTERFACE
contrib-libs-cxxsupp
yutil
cpp-threading-future
+ connector-api-common
yql-public-issue
)
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-aarch64.txt
index 392cc3fcaea..6578ba76e03 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-aarch64.txt
@@ -13,5 +13,6 @@ target_link_libraries(providers-common-db_id_async_resolver INTERFACE
contrib-libs-cxxsupp
yutil
cpp-threading-future
+ connector-api-common
yql-public-issue
)
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-x86_64.txt
index 392cc3fcaea..6578ba76e03 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.linux-x86_64.txt
@@ -13,5 +13,6 @@ target_link_libraries(providers-common-db_id_async_resolver INTERFACE
contrib-libs-cxxsupp
yutil
cpp-threading-future
+ connector-api-common
yql-public-issue
)
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.windows-x86_64.txt
index 7bbfc9a4c1a..df50b83d349 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/CMakeLists.windows-x86_64.txt
@@ -12,5 +12,6 @@ target_link_libraries(providers-common-db_id_async_resolver INTERFACE
contrib-libs-cxxsupp
yutil
cpp-threading-future
+ connector-api-common
yql-public-issue
)
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
index 25db8049952..4a58980d9d0 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
@@ -2,18 +2,40 @@
#include <library/cpp/threading/future/future.h>
#include <util/string/builder.h>
+#include <ydb/library/yql/providers/generic/connector/api/common/data_source.pb.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
namespace NYql {
-enum class DatabaseType {
+enum class EDatabaseType {
Ydb,
ClickHouse,
DataStreams,
ObjectStorage,
- Generic
+ PostgreSQL
};
+inline EDatabaseType DataSourceKindToDatabaseType(NConnector::NApi::EDataSourceKind dataSourceKind) {
+ switch (dataSourceKind) {
+ case NConnector::NApi::EDataSourceKind::POSTGRESQL:
+ return EDatabaseType::PostgreSQL;
+ case NConnector::NApi::EDataSourceKind::CLICKHOUSE:
+ return EDatabaseType::ClickHouse;
+ default:
+ ythrow yexception() << TStringBuf() << "Unknown data source kind: " << NConnector::NApi::EDataSourceKind_Name(dataSourceKind);
+ }
+}
+
+inline TString DatabaseTypeToString(EDatabaseType databaseType) {
+ switch (databaseType) {
+ case EDatabaseType::ClickHouse:
+ return "clickhouse";
+ case EDatabaseType::PostgreSQL:
+ return "postgresql";
+ default:
+ ythrow yexception() << TStringBuf() << "Unknown database type: " << int(databaseType);
+ }
+}
struct TDatabaseAuth {
TString StructuredToken;
@@ -27,7 +49,7 @@ struct TDatabaseAuth {
}
};
-struct TDbResolverResponse {
+struct TDatabaseResolverResponse {
struct TEndpoint {
std::tuple<TString, ui32> ParseHostPort() const {
@@ -47,11 +69,11 @@ struct TDbResolverResponse {
bool Secure = false;
};
- using TDatabaseEndpointsMap = THashMap<std::pair<TString, DatabaseType>, TEndpoint>;
+ using TDatabaseEndpointsMap = THashMap<std::pair<TString, EDatabaseType>, TEndpoint>;
- TDbResolverResponse() = default;
+ TDatabaseResolverResponse() = default;
- TDbResolverResponse(
+ TDatabaseResolverResponse(
TDatabaseEndpointsMap&& databaseId2Endpoint,
bool success = false,
const NYql::TIssues& issues = {})
@@ -66,9 +88,9 @@ struct TDbResolverResponse {
class IDatabaseAsyncResolver {
public:
- using TDatabaseAuthMap = THashMap<std::pair<TString, DatabaseType>, NYql::TDatabaseAuth>;
+ using TDatabaseAuthMap = THashMap<std::pair<TString, EDatabaseType>, NYql::TDatabaseAuth>;
- virtual NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds(const TDatabaseAuthMap& ids) const = 0;
+ virtual NThreading::TFuture<NYql::TDatabaseResolverResponse> ResolveIds(const TDatabaseAuthMap& ids) const = 0;
virtual ~IDatabaseAsyncResolver() = default;
};
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/mdb_host_transformer.h b/ydb/library/yql/providers/common/db_id_async_resolver/mdb_host_transformer.h
new file mode 100644
index 00000000000..188f739f984
--- /dev/null
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/mdb_host_transformer.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include "db_async_resolver.h"
+
+namespace NYql {
+ // IMdbHostTransformer is responsible for transforming the managed database instance host name
+ // into endpoint (`fqdn:port`) to establish network connection with data source.
+ // The host names are obtained from MDB API, for example:
+ // https://cloud.yandex.ru/docs/managed-clickhouse/api-ref/Cluster/listHosts
+ class IMdbHostTransformer {
+ public:
+ using TPtr = std::shared_ptr<IMdbHostTransformer>;
+
+ virtual TString ToEndpoint(const NYql::EDatabaseType databaseType, const TString& mdbHost) const = 0;
+ virtual ~IMdbHostTransformer() = default;
+ };
+}
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/ya.make b/ydb/library/yql/providers/common/db_id_async_resolver/ya.make
index 2cf88d4759e..9e226a741ad 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/ya.make
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/ya.make
@@ -2,10 +2,12 @@ LIBRARY()
SRCS(
db_async_resolver.h
+ mdb_host_transformer.h
)
PEERDIR(
library/cpp/threading/future
+ ydb/library/yql/providers/generic/connector/api/common
ydb/library/yql/public/issue
)
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
index f05688124d5..32c4c52068b 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp
@@ -51,7 +51,7 @@ namespace NYql {
ctx.MakeType<TItemExprType>(item->GetName(), ctx.MakeType<TBlockExprType>(item->GetItemType())));
}
- // FIXME:
+ // FIXME: YQ-2190
// Clickhouse provider used to work with multiple tables simultaneously;
// I don't know what to do with others.
break;
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
index aa8ba068eaa..279fe4fb606 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
@@ -29,7 +29,7 @@ namespace NYql {
if (!State_->DbResolver)
return TStatus::Ok;
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids;
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> ids;
if (auto reads = FindNodes(input,
[&](const TExprNode::TPtr& node) {
const TExprBase nodeExpr(node);
@@ -48,10 +48,11 @@ namespace NYql {
const auto clusterName = read.DataSource().Cluster().StringValue();
YQL_CLOG(DEBUG, ProviderGeneric) << "found cluster name: " << clusterName;
- auto databaseId = State_->Configuration->ClusterNamesToClusterConfigs[clusterName].GetDatabaseId();
+ auto& cluster = State_->Configuration->ClusterNamesToClusterConfigs[clusterName];
+ auto databaseId = cluster.GetDatabaseId();
if (databaseId) {
YQL_CLOG(DEBUG, ProviderGeneric) << "found database id: " << databaseId;
- const auto idKey = std::make_pair(databaseId, NYql::DatabaseType::Generic);
+ const auto idKey = std::make_pair(databaseId, DataSourceKindToDatabaseType(cluster.GetKind()));
const auto iter = State_->DatabaseIds.find(idKey);
if (iter != State_->DatabaseIds.end()) {
YQL_CLOG(DEBUG, ProviderGeneric) << "resolve database id: " << databaseId;
@@ -68,7 +69,7 @@ namespace NYql {
// FIXME: overengineered code - instead of using weak_ptr, directly copy shared_ptr in callback in this way:
// Apply([response = DbResolverResponse_](...))
- const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_;
+ const std::weak_ptr<NYql::TDatabaseResolverResponse> response = DbResolverResponse_;
AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) {
if (const auto res = response.lock()) {
*res = std::move(future.ExtractValue());
@@ -93,7 +94,7 @@ namespace NYql {
// Copy resolver results and reallocate pointer
auto databaseIdsToEndpointsResolved = std::move(DbResolverResponse_->DatabaseId2Endpoint);
- DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
// Modify cluster configs with resolved ids
return ModifyClusterConfigs(databaseIdsToEndpointsResolved, ctx);
@@ -101,11 +102,11 @@ namespace NYql {
void Rewind() final {
AsyncFuture_ = {};
- DbResolverResponse_.reset(new NYql::TDbResolverResponse);
+ DbResolverResponse_.reset(new NYql::TDatabaseResolverResponse);
}
private:
- TStatus ModifyClusterConfigs(const TDbResolverResponse::TDatabaseEndpointsMap& databaseIdsToEndpoints, TExprContext& ctx) {
+ TStatus ModifyClusterConfigs(const TDatabaseResolverResponse::TDatabaseEndpointsMap& databaseIdsToEndpoints, TExprContext& ctx) {
const auto& databaseIdsToClusterNames = State_->Configuration->DatabaseIdsToClusterNames;
auto& clusterNamesToClusterConfigs = State_->Configuration->ClusterNamesToClusterConfigs;
@@ -147,7 +148,7 @@ namespace NYql {
const TGenericState::TPtr State_;
NThreading::TFuture<void> AsyncFuture_;
- std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ std::shared_ptr<NYql::TDatabaseResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
};
}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
index b493fc3d505..b6e043e4f2d 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
@@ -58,7 +58,7 @@ namespace NYql {
if (dbResolver && databaseId) {
const auto token = MakeStructuredToken(cluster, credentials);
- databaseIds[std::make_pair(databaseId, NYql::DatabaseType::Generic)] = NYql::TDatabaseAuth{token, /*AddBearer=*/true};
+ databaseIds[std::make_pair(databaseId, DataSourceKindToDatabaseType(cluster.GetKind()))] = NYql::TDatabaseAuth{token, /*AddBearer=*/true};
DatabaseIdsToClusterNames[databaseId].emplace_back(clusterName);
YQL_CLOG(DEBUG, ProviderGeneric) << "database id '" << databaseId << "' added to mapping";
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp
index e5437828e24..9608c056021 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp
@@ -12,7 +12,7 @@ namespace {
using namespace NNodes;
class TPqDataSinkIODiscoveryTransformer : public TGraphTransformerBase {
-using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>;
+using TDbId2Endpoint = THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseResolverResponse::TEndpoint>;
public:
explicit TPqDataSinkIODiscoveryTransformer(TPqState::TPtr state)
: State_(state)
@@ -28,13 +28,13 @@ public:
if (!State_->DbResolver)
return TStatus::Ok;
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids;
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> ids;
FindYdsDbIdsForResolving(State_, input, ids);
if (ids.empty())
return TStatus::Ok;
- const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_;
+ const std::weak_ptr<NYql::TDatabaseResolverResponse> response = DbResolverResponse_;
AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future)
{
if (const auto res = response.lock())
@@ -55,7 +55,7 @@ public:
return TStatus::Error;
}
FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end());
- DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
FillSettingsWithResolvedYdsIds(State_, FullResolvedIds_);
return TStatus::Ok;
}
@@ -63,14 +63,14 @@ public:
void Rewind() final {
AsyncFuture_ = {};
FullResolvedIds_.clear();
- DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
}
private:
const TPqState::TPtr State_;
NThreading::TFuture<void> AsyncFuture_;
TDbId2Endpoint FullResolvedIds_;
- std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ std::shared_ptr<NYql::TDatabaseResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
};
}
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp
index 7bc56eef366..d845e40e90b 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp
@@ -29,7 +29,7 @@ TCoNameValueTupleList BuildTopicPropsList(const TPqState::TTopicMeta& meta, TPos
void FindYdsDbIdsForResolving(
const TPqState::TPtr& state,
TExprNode::TPtr input,
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids)
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& ids)
{
if (auto pqNodes = FindNodes(input, [&](const TExprNode::TPtr& node) {
if (auto maybePqRead = TMaybeNode<TPqRead>(node)) {
@@ -67,7 +67,7 @@ void FindYdsDbIdsForResolving(
if (!foundSetting->second.DatabaseId)
continue;
YQL_CLOG(INFO, ProviderPq) << "Resolve YDS id: " << foundSetting->second.DatabaseId;
- const auto idKey = std::make_pair(foundSetting->second.DatabaseId, NYql::DatabaseType::DataStreams);
+ const auto idKey = std::make_pair(foundSetting->second.DatabaseId, NYql::EDatabaseType::DataStreams);
const auto foundDbId = state->DatabaseIds.find(idKey);
if (foundDbId != state->DatabaseIds.end()) {
ids[idKey] = foundDbId->second;
@@ -79,7 +79,7 @@ void FindYdsDbIdsForResolving(
void FillSettingsWithResolvedYdsIds(
const TPqState::TPtr& state,
- const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>& fullResolvedIds)
+ const TDatabaseResolverResponse::TDatabaseEndpointsMap& fullResolvedIds)
{
YQL_CLOG(INFO, ProviderPq) << "FullResolvedIds size: " << fullResolvedIds.size();
auto& clusters = state->Configuration->ClustersConfigurationSettings;
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h
index 7804e63a34c..8cd752b3bbe 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h
@@ -15,10 +15,10 @@ NNodes::TCoNameValueTupleList BuildTopicPropsList(const TPqState::TTopicMeta& me
void FindYdsDbIdsForResolving(
const TPqState::TPtr& state,
TExprNode::TPtr input,
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids);
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& ids);
void FillSettingsWithResolvedYdsIds(
const TPqState::TPtr& state,
- const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>& fullResolvedIds);
+ const TDatabaseResolverResponse::TDatabaseEndpointsMap& fullResolvedIds);
} // namespace NYql
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp
index 4eef17a78d7..cc6939426e1 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp
@@ -13,7 +13,7 @@ using namespace NNodes;
class TPqIODiscoveryTransformer : public TGraphTransformerBase {
-using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>;
+using TDbId2Endpoint = THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseResolverResponse::TEndpoint>;
public:
explicit TPqIODiscoveryTransformer(TPqState::TPtr state)
@@ -30,13 +30,13 @@ public:
if (!State_->DbResolver)
return TStatus::Ok;
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids;
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> ids;
FindYdsDbIdsForResolving(State_, input, ids);
if (ids.empty())
return TStatus::Ok;
- const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_;
+ const std::weak_ptr<NYql::TDatabaseResolverResponse> response = DbResolverResponse_;
AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future)
{
if (const auto res = response.lock())
@@ -57,7 +57,7 @@ public:
return TStatus::Error;
}
FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end());
- DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
FillSettingsWithResolvedYdsIds(State_, FullResolvedIds_);
return TStatus::Ok;
}
@@ -65,14 +65,14 @@ public:
void Rewind() final {
AsyncFuture_ = {};
FullResolvedIds_.clear();
- DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
}
private:
const TPqState::TPtr State_;
NThreading::TFuture<void> AsyncFuture_;
TDbId2Endpoint FullResolvedIds_;
- std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ std::shared_ptr<NYql::TDatabaseResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
};
}
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h
index fa6d6077ec8..9819e3f3c5f 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h
@@ -52,7 +52,7 @@ public:
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
IPqGateway::TPtr Gateway;
THolder<IDqIntegration> DqIntegration;
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds;
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> DatabaseIds;
std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver;
};
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp
index ea2d32abd9e..c424fa9d0e9 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp
@@ -18,7 +18,7 @@ void TPqConfiguration::Init(
const TPqGatewayConfig& config,
TIntrusivePtr<TTypeAnnotationContext> typeCtx,
const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds)
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& databaseIds)
{
TVector<TString> clusters(Reserve(config.ClusterMappingSize()));
for (auto& cluster: config.GetClusterMapping()) {
@@ -52,7 +52,7 @@ void TPqConfiguration::Init(
YQL_CLOG(DEBUG, ProviderPq) << "Settings: clusterName = " << cluster.GetName()
<< ", clusterDbId = " << cluster.GetDatabaseId() << ", cluster.GetEndpoint(): " << cluster.GetEndpoint() << ", HasEndpoint = " << (cluster.HasEndpoint() ? "TRUE" : "FALSE") ;
if (cluster.GetDatabaseId()) {
- databaseIds[std::make_pair(cluster.GetDatabaseId(), NYql::DatabaseType::DataStreams)] =
+ databaseIds[std::make_pair(cluster.GetDatabaseId(), NYql::EDatabaseType::DataStreams)] =
NYql::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()};
DbId2Clusters[cluster.GetDatabaseId()].emplace_back(cluster.GetName());
YQL_CLOG(DEBUG, ProviderPq) << "Add dbId: " << cluster.GetDatabaseId() << " to DbId2Clusters";
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h
index 382af9c82b7..a720506ef12 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h
@@ -41,7 +41,7 @@ struct TPqConfiguration : public TPqSettings, public NCommon::TSettingDispatcher
const TPqGatewayConfig& config,
TIntrusivePtr<TTypeAnnotationContext> typeCtx,
const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds);
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& databaseIds);
TString GetDatabaseForTopic(const TString& cluster) const;
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp
index 1aea23417fe..28ecf98338e 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp
@@ -17,7 +17,7 @@ namespace {
using namespace NNodes;
class TYdbIODiscoveryTransformer : public TGraphTransformerBase {
-using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>;
+using TDbId2Endpoint = THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseResolverResponse::TEndpoint>;
public:
TYdbIODiscoveryTransformer(TYdbState::TPtr state)
: State_(std::move(state))
@@ -32,7 +32,7 @@ public:
if (!State_->DbResolver)
return TStatus::Ok;
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids;
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> ids;
if (auto reads = FindNodes(input, [&](const TExprNode::TPtr& node) {
const TExprBase nodeExpr(node);
if (!nodeExpr.Maybe<TYdbRead>())
@@ -47,7 +47,7 @@ public:
const TYdbRead read(node);
const auto& cluster = read.DataSource().Cluster().StringValue();
const auto& dbId = State_->Configuration->Clusters[cluster].DatabaseId;
- const auto idKey = std::make_pair(dbId, NYql::DatabaseType::Ydb);
+ const auto idKey = std::make_pair(dbId, NYql::EDatabaseType::Ydb);
const auto iter = State_->DatabaseIds.find(idKey);
if (iter != State_->DatabaseIds.end()) {
ids[idKey] = iter->second;
@@ -57,7 +57,7 @@ public:
if (ids.empty()) {
return TStatus::Ok;
}
- const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_;
+ const std::weak_ptr<NYql::TDatabaseResolverResponse> response = DbResolverResponse_;
AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future)
{
if (const auto res = response.lock())
@@ -78,7 +78,7 @@ public:
return TStatus::Error;
}
FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end());
- DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
auto& clusters = State_->Configuration->Clusters;
const auto& id2Clusters = State_->Configuration->DbId2Clusters;
for (const auto& [dbIdWithType, info] : FullResolvedIds_) {
@@ -100,14 +100,14 @@ public:
void Rewind() final {
AsyncFuture_ = {};
FullResolvedIds_.clear();
- DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
}
private:
const TYdbState::TPtr State_;
NThreading::TFuture<void> AsyncFuture_;
TDbId2Endpoint FullResolvedIds_;
- std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
+ std::shared_ptr<NYql::TDatabaseResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDatabaseResolverResponse>();
};
}
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h
index 1135c62753a..d5b00a35036 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h
@@ -33,7 +33,7 @@ struct TYdbState : public TThrRefBase
TYdbConfiguration::TPtr Configuration = MakeIntrusive<TYdbConfiguration>();
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds;
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth> DatabaseIds;
std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver;
};
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp
index 8c170078f54..7c088646b61 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp
@@ -21,7 +21,7 @@ void TYdbConfiguration::Init(
const TYdbGatewayConfig& config,
TIntrusivePtr<TTypeAnnotationContext> typeCtx,
const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds)
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& databaseIds)
{
TVector<TString> clusters(Reserve(config.ClusterMappingSize()));
for (auto& cluster: config.GetClusterMapping()) {
@@ -43,7 +43,7 @@ void TYdbConfiguration::Init(
if (dbResolver) {
if (cluster.GetId()) {
- databaseIds[std::make_pair(cluster.GetId(), NYql::DatabaseType::Ydb)] =
+ databaseIds[std::make_pair(cluster.GetId(), NYql::EDatabaseType::Ydb)] =
NYql::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()};
DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName());
}
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h
index 1d72a52cd34..ee2bc0df18c 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h
@@ -34,7 +34,7 @@ struct TYdbConfiguration : public TYdbSettings, public NCommon::TSettingDispatch
const TYdbGatewayConfig& config,
TIntrusivePtr<TTypeAnnotationContext> typeCtx,
const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
- THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds
+ THashMap<std::pair<TString, NYql::EDatabaseType>, NYql::TDatabaseAuth>& databaseIds
);
bool HasCluster(TStringBuf cluster) const;
diff --git a/ydb/services/fq/ut_integration/CMakeLists.darwin-x86_64.txt b/ydb/services/fq/ut_integration/CMakeLists.darwin-x86_64.txt
index 00b63b10631..48945309966 100644
--- a/ydb/services/fq/ut_integration/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/fq/ut_integration/CMakeLists.darwin-x86_64.txt
@@ -25,6 +25,7 @@ target_link_libraries(ydb-services-fq-ut_integration PUBLIC
cpp-regex-pcre
library-cpp-svnversion
fq-libs-control_plane_storage
+ fq-libs-db_id_async_resolver_impl
fq-libs-db_schema
fq-libs-private_client
core-testlib-default
diff --git a/ydb/services/fq/ut_integration/CMakeLists.linux-aarch64.txt b/ydb/services/fq/ut_integration/CMakeLists.linux-aarch64.txt
index 971833aa8c9..365ca27a222 100644
--- a/ydb/services/fq/ut_integration/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/fq/ut_integration/CMakeLists.linux-aarch64.txt
@@ -25,6 +25,7 @@ target_link_libraries(ydb-services-fq-ut_integration PUBLIC
cpp-regex-pcre
library-cpp-svnversion
fq-libs-control_plane_storage
+ fq-libs-db_id_async_resolver_impl
fq-libs-db_schema
fq-libs-private_client
core-testlib-default
diff --git a/ydb/services/fq/ut_integration/CMakeLists.linux-x86_64.txt b/ydb/services/fq/ut_integration/CMakeLists.linux-x86_64.txt
index 2cb80bd0a8f..42f746e5097 100644
--- a/ydb/services/fq/ut_integration/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/fq/ut_integration/CMakeLists.linux-x86_64.txt
@@ -26,6 +26,7 @@ target_link_libraries(ydb-services-fq-ut_integration PUBLIC
cpp-regex-pcre
library-cpp-svnversion
fq-libs-control_plane_storage
+ fq-libs-db_id_async_resolver_impl
fq-libs-db_schema
fq-libs-private_client
core-testlib-default
diff --git a/ydb/services/fq/ut_integration/CMakeLists.windows-x86_64.txt b/ydb/services/fq/ut_integration/CMakeLists.windows-x86_64.txt
index 354170ff722..ed7bc57c8dd 100644
--- a/ydb/services/fq/ut_integration/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/fq/ut_integration/CMakeLists.windows-x86_64.txt
@@ -25,6 +25,7 @@ target_link_libraries(ydb-services-fq-ut_integration PUBLIC
cpp-regex-pcre
library-cpp-svnversion
fq-libs-control_plane_storage
+ fq-libs-db_id_async_resolver_impl
fq-libs-db_schema
fq-libs-private_client
core-testlib-default
diff --git a/ydb/services/fq/ut_integration/fq_ut.cpp b/ydb/services/fq/ut_integration/fq_ut.cpp
index a2ada19c336..a13fd6f13b5 100644
--- a/ydb/services/fq/ut_integration/fq_ut.cpp
+++ b/ydb/services/fq/ut_integration/fq_ut.cpp
@@ -9,6 +9,7 @@
#include <ydb/core/fq/libs/control_plane_storage/message_builders.h>
#include <ydb/core/fq/libs/actors/database_resolver.h>
+#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_host_transformer.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
@@ -858,9 +859,21 @@ Y_UNIT_TEST_SUITE(Yq_1) {
Y_UNIT_TEST_SUITE(Yq_2) {
// use fork for data test due to ch initialization problem
Y_UNIT_TEST(Test_HostNameTrasformation) {
- UNIT_ASSERT_VALUES_EQUAL(::NFq::TransformMdbHostToCorrectFormat("rc1c-p5waby2y5y1kb5ue.mdb.yandexcloud.net"), "rc1c-p5waby2y5y1kb5ue.db.yandex.net:8443");
- UNIT_ASSERT_VALUES_EQUAL(::NFq::TransformMdbHostToCorrectFormat("xxx.xxx"), "xxx.db.yandex.net:8443");
- UNIT_ASSERT_VALUES_EQUAL(::NFq::TransformMdbHostToCorrectFormat("host."), "host.db.yandex.net:8443");
+ {
+ auto transformer = ::NFq::MakeTMdbHostTransformerLegacy();
+ UNIT_ASSERT_VALUES_EQUAL(transformer->ToEndpoint(NYql::EDatabaseType::ClickHouse, "rc1c-p5waby2y5y1kb5ue.db.yandex.net"),
+ "rc1c-p5waby2y5y1kb5ue.db.yandex.net:8443");
+ UNIT_ASSERT_VALUES_EQUAL(transformer->ToEndpoint(NYql::EDatabaseType::ClickHouse, "ya.ru"),
+ "ya.db.yandex.net:8443");
+ }
+
+ {
+ auto transformer = ::NFq::MakeTMdbHostTransformerGeneric();
+ UNIT_ASSERT_VALUES_EQUAL(::NFq::MakeTMdbHostTransformerGeneric()->ToEndpoint(NYql::EDatabaseType::ClickHouse, "rc1a-d6dv17lv47v5mcop.mdb.yandexcloud.net"),
+ "rc1a-d6dv17lv47v5mcop.mdb.yandexcloud.net:9000");
+ UNIT_ASSERT_VALUES_EQUAL(::NFq::MakeTMdbHostTransformerGeneric()->ToEndpoint(NYql::EDatabaseType::PostgreSQL, "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net"),
+ "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net:6432");
+ }
}
SIMPLE_UNIT_FORKED_TEST(ReadFromYdbOverYq) {
diff --git a/ydb/services/fq/ut_integration/ya.make b/ydb/services/fq/ut_integration/ya.make
index 9f3f25f0a2a..2548e272a92 100644
--- a/ydb/services/fq/ut_integration/ya.make
+++ b/ydb/services/fq/ut_integration/ya.make
@@ -15,6 +15,7 @@ PEERDIR(
library/cpp/regex/pcre
library/cpp/svnversion
ydb/core/fq/libs/control_plane_storage
+ ydb/core/fq/libs/db_id_async_resolver_impl
ydb/core/fq/libs/db_schema
ydb/core/fq/libs/private_client
ydb/core/testlib/default