aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordinmukhammed <dinmukhammed@yandex-team.ru>2022-02-16 14:46:33 +0300
committerdinmukhammed <dinmukhammed@yandex-team.ru>2022-02-16 14:46:33 +0300
commitf8105a7f3a524325d504a2b519f8f40fc109f6bb (patch)
tree17f6a691f17cf2e7ee011a8031760f1a8e0302c7
parent2f8ea7e2648c1624b5c1af79efe506096c4ce519 (diff)
downloadydb-f8105a7f3a524325d504a2b519f8f40fc109f6bb.tar.gz
YQ-831 Cut dependecy: YQL --> YQ
Fixed deps ref:09c66d3cc6f83868fa99039a01af629e24778a56
-rw-r--r--ydb/core/yq/libs/actors/database_resolver.cpp49
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp2
-rw-r--r--ydb/core/yq/libs/actors/ya.make1
-rw-r--r--ydb/core/yq/libs/common/database_token_builder.cpp45
-rw-r--r--ydb/core/yq/libs/common/database_token_builder.h24
-rw-r--r--ydb/core/yq/libs/common/ya.make1
-rw-r--r--ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp (renamed from ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp)6
-rw-r--r--ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h (renamed from ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h)9
-rw-r--r--ydb/core/yq/libs/db_id_async_resolver_impl/ya.make18
-rw-r--r--ydb/core/yq/libs/events/events.h54
-rw-r--r--ydb/core/yq/libs/events/ya.make1
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp13
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp2
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h6
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h10
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h51
-rw-r--r--ydb/library/yql/providers/common/db_id_async_resolver/ya.make5
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp10
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp6
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_helpers.h4
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp10
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp2
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_provider.h6
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp8
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_settings.h5
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp14
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp2
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h6
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp8
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h5
30 files changed, 167 insertions, 216 deletions
diff --git a/ydb/core/yq/libs/actors/database_resolver.cpp b/ydb/core/yq/libs/actors/database_resolver.cpp
index 28402cb7d80..59558c22d2d 100644
--- a/ydb/core/yq/libs/actors/database_resolver.cpp
+++ b/ydb/core/yq/libs/actors/database_resolver.cpp
@@ -2,6 +2,7 @@
#include <ydb/core/yq/libs/events/events.h>
#include <ydb/core/yq/libs/common/cache.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/http/http.h>
@@ -9,6 +10,7 @@
#include <library/cpp/json/json_reader.h>
#include <ydb/core/protos/services.pb.h>
+
#define LOG_E(stream) \
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "DatabaseResolver - TraceId: " << TraceId << ": " << stream)
@@ -20,10 +22,10 @@ namespace NYq {
using namespace NActors;
using namespace NYql;
-using TEndpoint = TEvents::TDbResolverResponse::TEndpoint;
+using TEndpoint = NYql::TDbResolverResponse::TEndpoint;
-using TParsers = THashMap<DatabaseType, std::function<TEndpoint(NJson::TJsonValue& body, bool)>>;
-using TCache = TTtlCache<std::tuple<TString, DatabaseType, TEvents::TDatabaseAuth>, std::variant<TEndpoint, TString>>;
+using TParsers = THashMap<NYql::DatabaseType, std::function<TEndpoint(NJson::TJsonValue& body, bool)>>;
+using TCache = TTtlCache<std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>, std::variant<TEndpoint, TString>>;
TString TransformMdbHostToCorrectFormat(const TString& mdbHost) {
return mdbHost.substr(0, mdbHost.find('.')) + ".db.yandex.net:8443";
@@ -39,8 +41,8 @@ public:
TResponseProcessor(
const TActorId sender,
TCache& cache,
- const THashMap<std::pair<TString, DatabaseType>, TEndpoint>& ready,
- const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, DatabaseType, TEvents::TDatabaseAuth>>& requests,
+ const THashMap<std::pair<TString, NYql::DatabaseType>, TEndpoint>& ready,
+ const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>>& requests,
const TString& traceId,
bool mdbTransformHost,
const TParsers& parsers)
@@ -103,7 +105,7 @@ private:
}
Send(Sender,
new TEvents::TEvEndpointResponse(
- TEvents::TDbResolverResponse(std::move(DatabaseId2Endpoint), Success, issues)));
+ NYql::TDbResolverResponse(std::move(DatabaseId2Endpoint), Success, issues)));
PassAway();
}
@@ -112,8 +114,8 @@ private:
TString status;
TString errorMessage;
TString databaseId;
- DatabaseType databaseType = DatabaseType::Ydb;
- TEvents::TDatabaseAuth info;
+ NYql::DatabaseType databaseType = DatabaseType::Ydb;
+ NYql::TDatabaseAuth info;
TMaybe<TEndpoint> result;
HandledIds++;
if (ev->Get()->Error.empty() && (ev->Get()->Response && ((status = ev->Get()->Response->Status) == "200"))) {
@@ -131,20 +133,23 @@ private:
if (parseJsonOk && (parserIt = Parsers.find(databaseType)) != Parsers.end()) {
try {
auto res = parserIt->second(databaseInfo, MdbTransformHost);
- LOG_D("Got " << databaseId << " " << databaseType << " endpoint " << res.Endpoint << " " << res.Database);
+ LOG_D("Got db_id: " << databaseId
+ << ", db type: " << static_cast<std::underlying_type<NYql::DatabaseType>::type>(databaseType)
+ << ", endpoint: " << res.Endpoint
+ << ", database: " << res.Database);
DatabaseId2Endpoint[std::make_pair(databaseId, databaseType)] = res;
result.ConstructInPlace(res);
} catch (...) {
errorMessage = TStringBuilder()
<< " Couldn't resolve "
- << databaseType << " Id: "
- << databaseId << "\n"
+ << "databaseId: " << databaseId
+ << ", db type: " << static_cast<std::underlying_type<NYql::DatabaseType>::type>(databaseType) << "\n"
<< CurrentExceptionMessage();
}
} else {
errorMessage = TStringBuilder() << "Unable to parse database information."
<< "Database Id: " << databaseId
- << ", Database Type: " << databaseType;
+ << ", db type: " << static_cast<std::underlying_type<NYql::DatabaseType>::type>(databaseType);
}
}
} else {
@@ -182,10 +187,10 @@ private:
private:
const TActorId Sender;
TCache& Cache;
- const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, DatabaseType, TEvents::TDatabaseAuth>> Requests;
+ const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>> Requests;
const TString TraceId;
const bool MdbTransformHost;
- THashMap<std::pair<TString, DatabaseType>, TEndpoint> DatabaseId2Endpoint;
+ THashMap<std::pair<TString, NYql::DatabaseType>, TEndpoint> DatabaseId2Endpoint;
size_t HandledIds = 0;
bool Success = true;
const TParsers& Parsers;
@@ -226,8 +231,8 @@ public:
Y_ENSURE(endpoint);
return TEndpoint{endpoint, database, secure};
};
- Parsers[DatabaseType::Ydb] = ydbParser;
- Parsers[DatabaseType::DataStreams] = [ydbParser](NJson::TJsonValue& databaseInfo, bool mdbTransformHost)
+ Parsers[NYql::DatabaseType::Ydb] = ydbParser;
+ Parsers[NYql::DatabaseType::DataStreams] = [ydbParser](NJson::TJsonValue& databaseInfo, bool mdbTransformHost)
{
auto ret = ydbParser(databaseInfo, mdbTransformHost);
// TODO: Take explicit field from MVP
@@ -237,7 +242,7 @@ public:
}
return ret;
};
- Parsers[DatabaseType::ClickHouse] = [](NJson::TJsonValue& databaseInfo, bool mdbTransformHost) {
+ Parsers[NYql::DatabaseType::ClickHouse] = [](NJson::TJsonValue& databaseInfo, bool mdbTransformHost) {
TString endpoint;
TVector<TString> aliveHosts;
for (const auto& host : databaseInfo.GetMap().at("hosts").GetArraySafe()) {
@@ -266,7 +271,7 @@ private:
void SendResponse(
const NActors::TActorId& recipient,
- THashMap<std::pair<TString, DatabaseType>, TEndpoint>&& ready,
+ THashMap<std::pair<TString, NYql::DatabaseType>, TEndpoint>&& ready,
bool success = true,
const TString& errorMessage = "")
{
@@ -275,15 +280,15 @@ private:
if (errorMessage)
issues.AddIssue(errorMessage);
Send(recipient,
- new TEvents::TEvEndpointResponse(TEvents::TDbResolverResponse{std::move(ready), success, issues}));
+ new TEvents::TEvEndpointResponse(NYql::TDbResolverResponse{std::move(ready), success, issues}));
}
void Handle(TEvents::TEvEndpointRequest::TPtr ev, const TActorContext& ctx)
{
TraceId = ev->Get()->TraceId;
LOG_D("Start databaseId resolver for " << ev->Get()->DatabaseIds.size() << " ids");
- THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, DatabaseType, TEvents::TDatabaseAuth>> requests; // request, (dbId, type, info)
- THashMap<std::pair<TString, DatabaseType>, TEndpoint> ready;
+ THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>> requests; // request, (dbId, type, info)
+ THashMap<std::pair<TString, NYql::DatabaseType>, TEndpoint> ready;
for (const auto& [p, info] : ev->Get()->DatabaseIds) {
const auto& [databaseId, type] = p;
TMaybe<std::variant<TEndpoint, TString>> cacheVal;
@@ -306,7 +311,7 @@ private:
}
try {
- TString url = IsIn({ DatabaseType::Ydb, DatabaseType::DataStreams }, type) ?
+ TString url = IsIn({NYql::DatabaseType::Ydb, NYql::DatabaseType::DataStreams }, type) ?
ev->Get()->YdbMvpEndpoint + "/database?databaseId=" + databaseId :
ev->Get()->MdbGateway + "/managed-clickhouse/v1/clusters/" + databaseId + "/hosts";
LOG_D("Get '" << url << "'");
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 0ca3332432e..2643150d88f 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -51,6 +51,7 @@
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/log.h>
#include <ydb/core/yq/libs/common/entity_id.h>
+#include <ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
#include <ydb/core/yq/libs/actors/nodes_manager.h>
#include <ydb/core/yq/libs/gateway/empty_gateway.h>
#include <ydb/core/yq/libs/read_rule/read_rule_creator.h>
@@ -68,7 +69,6 @@
#include <ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h>
#include <ydb/core/yq/libs/checkpointing_common/defs.h>
#include <ydb/core/yq/libs/checkpoint_storage/storage_service.h>
-#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h>
#include <ydb/core/yq/libs/private_client/private_client.h>
#define LOG_E(stream) \
diff --git a/ydb/core/yq/libs/actors/ya.make b/ydb/core/yq/libs/actors/ya.make
index 78d6d09aca5..15edfaccf4e 100644
--- a/ydb/core/yq/libs/actors/ya.make
+++ b/ydb/core/yq/libs/actors/ya.make
@@ -36,6 +36,7 @@ PEERDIR(
ydb/core/yq/libs/actors/logging
ydb/core/yq/libs/checkpointing
ydb/core/yq/libs/checkpointing_common
+ ydb/core/yq/libs/db_id_async_resolver_impl
ydb/core/yq/libs/common
ydb/core/yq/libs/control_plane_storage
ydb/core/yq/libs/control_plane_storage/events
diff --git a/ydb/core/yq/libs/common/database_token_builder.cpp b/ydb/core/yq/libs/common/database_token_builder.cpp
deleted file mode 100644
index 89d36cfba6f..00000000000
--- a/ydb/core/yq/libs/common/database_token_builder.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-#include "database_token_builder.h"
-#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
-
-namespace NYq {
-
-using namespace NYql;
-
- TString BuildStructuredToken(const YandexQuery::IamAuth& auth, const TString& authToken, const THashMap<TString, TString>& accountIdSignatures) {
- TStructuredTokenBuilder result;
- switch (auth.identity_case()) {
- case YandexQuery::IamAuth::kCurrentIam:
- result.SetIAMToken(authToken);
- break;
- case YandexQuery::IamAuth::kServiceAccount: {
- const auto& signature = accountIdSignatures.at(auth.service_account().id());
- result.SetServiceAccountIdAuth(auth.service_account().id(), signature);
- break;
- }
- default:
- result.SetNoAuth();
- break;
- }
-
- return result.ToJson();
- }
-
- void TryAddDatabaseToResolve(
- const YandexQuery::IamAuth& auth,
- const TString& databaseId,
- DatabaseType type,
- const TString& authToken,
- const THashMap<TString, TString>& accountIdSignatures,
- THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& databaseIds) {
- if (!databaseId) {
- return;
- }
-
- TEvents::TDatabaseAuth info;
- info.StructuredToken = BuildStructuredToken(auth, authToken, accountIdSignatures);
- info.AddBearerToToken = true; // XXX
- databaseIds[std::make_pair(databaseId, type)] = info;
- }
-
-} // NYq
-
diff --git a/ydb/core/yq/libs/common/database_token_builder.h b/ydb/core/yq/libs/common/database_token_builder.h
deleted file mode 100644
index a3e00efab5a..00000000000
--- a/ydb/core/yq/libs/common/database_token_builder.h
+++ /dev/null
@@ -1,24 +0,0 @@
-#pragma once
-
-#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
-#include <ydb/core/yq/libs/events/events.h>
-
-namespace NYq {
-
-using namespace NActors;
-using namespace NYql;
-
- TString BuildStructuredToken(
- const YandexQuery::IamAuth& auth,
- const TString& authToken,
- const THashMap<TString, TString>& accountIdSignatures);
-
- void TryAddDatabaseToResolve(
- const YandexQuery::IamAuth& auth,
- const TString& databaseId,
- DatabaseType type,
- const TString& authToken,
- const THashMap<TString, TString>& accountIdSignatures,
- THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& databaseIds);
-
-} // NYq
diff --git a/ydb/core/yq/libs/common/ya.make b/ydb/core/yq/libs/common/ya.make
index 4622695700a..d2527b63473 100644
--- a/ydb/core/yq/libs/common/ya.make
+++ b/ydb/core/yq/libs/common/ya.make
@@ -3,7 +3,6 @@ OWNER(g:yq)
LIBRARY()
SRCS(
- database_token_builder.cpp
entity_id.cpp
rows_proto_splitter.cpp
)
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp b/ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
index e13093b4637..242fcf86c93 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp
+++ b/ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp
@@ -18,10 +18,10 @@ TDatabaseAsyncResolverImpl::TDatabaseAsyncResolverImpl(
, TraceId(traceId)
{}
-TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(
- const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const
+TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds(
+ const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids) const
{
- auto promise = NewPromise<TEvents::TDbResolverResponse>();
+ auto promise = NewPromise<NYql::TDbResolverResponse>();
TDuration timeout = TDuration::Seconds(40);
auto callback = MakeHolder<NYql::TRichActorFutureCallback<TEvents::TEvEndpointResponse>>(
[promise] (TAutoPtr<NActors::TEventHandle<TEvents::TEvEndpointResponse>>& event) mutable {
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h b/ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h
index 10ca0c956e4..22c248d6076 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h
+++ b/ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h
@@ -1,10 +1,11 @@
#pragma once
-#include "db_async_resolver.h"
+#include <ydb/core/yq/libs/events/events.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
#include <ydb/library/yql/providers/dq/actors/actor_helpers.h>
namespace NYq {
-class TDatabaseAsyncResolverImpl : public IDatabaseAsyncResolver {
+class TDatabaseAsyncResolverImpl : public NYql::IDatabaseAsyncResolver {
public:
TDatabaseAsyncResolverImpl(
NActors::TActorSystem* actorSystem,
@@ -15,8 +16,8 @@ public:
const TString& traceId = ""
);
- NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(
- const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const override;
+ NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds(
+ const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids) const override;
private:
NActors::TActorSystem* ActorSystem;
const NActors::TActorId Recipient;
diff --git a/ydb/core/yq/libs/db_id_async_resolver_impl/ya.make b/ydb/core/yq/libs/db_id_async_resolver_impl/ya.make
new file mode 100644
index 00000000000..505c600686b
--- /dev/null
+++ b/ydb/core/yq/libs/db_id_async_resolver_impl/ya.make
@@ -0,0 +1,18 @@
+OWNER(g:yq)
+
+LIBRARY()
+
+SRCS(
+ db_async_resolver_impl.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/future
+ ydb/core/yq/libs/events
+ ydb/library/yql/providers/common/db_id_async_resolver
+ ydb/library/yql/providers/dq/actors
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h
index 36c864dca8d..0ba106b33aa 100644
--- a/ydb/core/yq/libs/events/events.h
+++ b/ydb/core/yq/libs/events/events.h
@@ -2,6 +2,7 @@
#include "event_ids.h"
#include <ydb/library/yql/core/facade/yql_facade.h>
+#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -18,13 +19,6 @@ namespace NYq {
using NYdb::NYq::TScope;
-enum class DatabaseType {
- Ydb,
- ClickHouse,
- DataStreams,
- ObjectStorage
-};
-
struct TQueryResult {
TVector<Ydb::ResultSet> Sets;
TInstant ExpirationDeadline;
@@ -126,54 +120,21 @@ struct TEvents {
{}
};
- struct TDbResolverResponse {
- struct TEndpoint {
- TString Endpoint;
- TString Database;
- bool Secure = false;
- };
-
- TDbResolverResponse() = default;
-
- TDbResolverResponse(
- THashMap<std::pair<TString, DatabaseType>, TEndpoint>&& databaseId2Endpoint,
- bool success = false,
- const NYql::TIssues& issues = {})
- : DatabaseId2Endpoint(databaseId2Endpoint)
- , Success(success)
- , Issues(issues) {}
-
- THashMap<std::pair<TString, DatabaseType>, TEndpoint> DatabaseId2Endpoint;
- bool Success = false;
- NYql::TIssues Issues;
- };
-
struct TEvEndpointResponse : NActors::TEventLocal<TEvEndpointResponse, TEventIds::EvEndpointResponse> {
- TDbResolverResponse DbResolverResponse;
- explicit TEvEndpointResponse(TDbResolverResponse&& response) noexcept : DbResolverResponse(std::move(response)) {}
+ NYql::TDbResolverResponse DbResolverResponse;
+ explicit TEvEndpointResponse(NYql::TDbResolverResponse&& response) noexcept : DbResolverResponse(std::move(response)) {}
};
- struct TDatabaseAuth {
- TString StructuredToken;
- bool AddBearerToToken = false;
-
- bool operator==(const TDatabaseAuth& other) const {
- return std::tie(StructuredToken, AddBearerToToken) == std::tie(other.StructuredToken, other.AddBearerToToken);
- }
- bool operator!=(const TDatabaseAuth& other) const {
- return !(*this == other);
- }
- };
struct TEvEndpointRequest : NActors::TEventLocal<TEvEndpointRequest, TEventIds::EvEndpointRequest> {
- THashMap<std::pair<TString, DatabaseType>, TDatabaseAuth> DatabaseIds; // DbId, DatabaseType => database auth
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds; // DbId, DatabaseType => database auth
TString YdbMvpEndpoint;
TString MdbGateway;
TString TraceId;
bool MdbTransformHost;
TEvEndpointRequest(
- const THashMap<std::pair<TString, DatabaseType>, TDatabaseAuth>& databaseIds,
+ const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds,
const TString& ydbMvpEndpoint,
const TString& mdbGateway,
const TString& traceId,
@@ -193,6 +154,7 @@ struct TEvents {
}
NYql::TIssues Issues;
+ NYql::TIssues TransientIssues;
};
struct TEvDataStreamsReadRulesDeletionResult : NActors::TEventLocal<TEvDataStreamsReadRulesDeletionResult, TEventIds::EvDataStreamsReadRulesDeletionResult> {
@@ -261,8 +223,8 @@ struct TEvents {
} // namespace NYq
template<>
-struct THash<NYq::TEvents::TDatabaseAuth> {
- inline ui64 operator()(const NYq::TEvents::TDatabaseAuth& x) const noexcept {
+struct THash<NYql::TDatabaseAuth> {
+ inline ui64 operator()(const NYql::TDatabaseAuth& x) const noexcept {
return MultiHash(x.StructuredToken, x.AddBearerToToken);
}
};
diff --git a/ydb/core/yq/libs/events/ya.make b/ydb/core/yq/libs/events/ya.make
index c12bdd681f4..a8310348683 100644
--- a/ydb/core/yq/libs/events/ya.make
+++ b/ydb/core/yq/libs/events/ya.make
@@ -14,6 +14,7 @@ PEERDIR(
ydb/public/api/protos
ydb/public/lib/yq
ydb/public/sdk/cpp/client/ydb_table
+ ydb/library/yql/providers/common/db_id_async_resolver
ydb/library/yql/providers/dq/provider
)
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp
index 704431b1fb0..ed5db76023f 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp
@@ -6,7 +6,6 @@
#include <ydb/library/yql/core/yql_graph_transformer.h>
#include <ydb/library/yql/ast/yql_expr.h>
-#include <ydb/core/yq/libs/events/events.h>
namespace NYql {
@@ -16,7 +15,7 @@ using namespace NNodes;
class TClickHouseIODiscoveryTransformer : public TGraphTransformerBase {
-using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>;
+using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>;
public:
TClickHouseIODiscoveryTransformer(TClickHouseState::TPtr state)
@@ -32,7 +31,7 @@ public:
if (!State_->DbResolver)
return TStatus::Ok;
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> ids;
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids;
if (auto reads = FindNodes(input, [&](const TExprNode::TPtr& node) {
const TExprBase nodeExpr(node);
if (!nodeExpr.Maybe<TClRead>())
@@ -51,7 +50,7 @@ public:
auto dbId = State_->Configuration->Endpoints[cluster].first;
dbId = dbId.substr(0, dbId.find(':'));
YQL_CLOG(DEBUG, ProviderClickHouse) << "Found dbId: " << dbId;
- const auto idKey = std::make_pair(dbId, NYq::DatabaseType::ClickHouse);
+ const auto idKey = std::make_pair(dbId, NYql::DatabaseType::ClickHouse);
const auto iter = State_->DatabaseIds.find(idKey);
if (iter != State_->DatabaseIds.end()) {
YQL_CLOG(DEBUG, ProviderClickHouse) << "Resolve CH id: " << dbId;
@@ -65,7 +64,7 @@ public:
return TStatus::Ok;
}
- const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_;
+ const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_;
AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) {
if (const auto res = response.lock())
*res = std::move(future.ExtractValue());
@@ -85,7 +84,7 @@ public:
return TStatus::Error;
}
FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end());
- DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
+ DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
YQL_CLOG(DEBUG, ProviderClickHouse) << "ResolvedIds: " << FullResolvedIds_.size();
auto& endpoints = State_->Configuration->Endpoints;
const auto& id2Clusters = State_->Configuration->DbId2Clusters;
@@ -115,7 +114,7 @@ private:
NThreading::TFuture<void> AsyncFuture_;
TDbId2Endpoint FullResolvedIds_;
- std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
+ std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
};
}
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp
index 3cdfb17866e..3f309849333 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp
@@ -6,7 +6,7 @@ namespace NYql {
TDataProviderInitializer GetClickHouseDataProviderInitializer(
IHTTPGateway::TPtr gateway,
- const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver)
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver)
{
return [gateway, dbResolver] (
const TString& userName,
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h
index 17ebb38b7b3..849ed534ed8 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h
@@ -27,13 +27,13 @@ struct TClickHouseState : public TThrRefBase
TTypeAnnotationContext* Types = nullptr;
TClickHouseConfiguration::TPtr Configuration = MakeIntrusive<TClickHouseConfiguration>();
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> DatabaseIds;
- std::shared_ptr<NYq::IDatabaseAsyncResolver> DbResolver;
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds;
+ std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver;
};
TDataProviderInitializer GetClickHouseDataProviderInitializer(
IHTTPGateway::TPtr gateway,
- std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver = nullptr
+ std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr
);
TIntrusivePtr<IDataProvider> CreateClickHouseDataSource(TClickHouseState::TPtr state, IHTTPGateway::TPtr gateway);
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h
index c206844b4f9..bcf1acd4f2d 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h
@@ -6,8 +6,6 @@
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
-#include <ydb/core/yq/libs/events/events.h>
-
namespace NYql {
@@ -24,8 +22,8 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS
template <typename TProtoConfig>
void Init(
const TProtoConfig& config,
- const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver,
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds)
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds)
{
TVector<TString> clusters(Reserve(config.ClusterMappingSize()));
for (auto& cluster: config.GetClusterMapping()) {
@@ -41,8 +39,8 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS
if (dbResolver) {
YQL_CLOG(DEBUG, ProviderClickHouse) << "Settings: clusterName = " << cluster.GetName()
<< ", clusterDbId = " << cluster.GetId() << ", cluster.GetCluster(): " << cluster.GetCluster() << ", HasCluster: " << (cluster.HasCluster() ? "TRUE" : "FALSE") ;
- databaseIds[std::make_pair(cluster.GetId(), NYq::DatabaseType::ClickHouse)] =
- NYq::TEvents::TDatabaseAuth{cluster.GetCHToken(), /*AddBearer=*/false};
+ databaseIds[std::make_pair(cluster.GetId(), NYql::DatabaseType::ClickHouse)] =
+ NYql::TDatabaseAuth{cluster.GetCHToken(), /*AddBearer=*/false};
if (cluster.GetId()) {
DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName());
YQL_CLOG(DEBUG, ProviderClickHouse) << "Add dbId: " << cluster.GetId() << " to DbId2Clusters";
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
index 733bed54b02..bf4983ad13f 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h
@@ -1,16 +1,57 @@
#pragma once
+#include <ydb/library/yql/public/issue/yql_issue.h>
+
#include <library/cpp/threading/future/future.h>
-#include <ydb/core/yq/libs/events/events.h>
+namespace NYql {
+
+enum class DatabaseType {
+ Ydb,
+ ClickHouse,
+ DataStreams,
+ ObjectStorage
+};
+
+struct TDatabaseAuth {
+ TString StructuredToken;
+ bool AddBearerToToken = false;
+
+ bool operator==(const TDatabaseAuth& other) const {
+ return std::tie(StructuredToken, AddBearerToToken) == std::tie(other.StructuredToken, other.AddBearerToToken);
+ }
+ bool operator!=(const TDatabaseAuth& other) const {
+ return !(*this == other);
+ }
+};
+
+struct TDbResolverResponse {
+ struct TEndpoint {
+ TString Endpoint;
+ TString Database;
+ bool Secure = false;
+ };
+
+ TDbResolverResponse() = default;
-namespace NYq {
+ TDbResolverResponse(
+ THashMap<std::pair<TString, DatabaseType>, TEndpoint>&& databaseId2Endpoint,
+ bool success = false,
+ const NYql::TIssues& issues = {})
+ : DatabaseId2Endpoint(std::move(databaseId2Endpoint))
+ , Success(success)
+ , Issues(issues) {}
+
+ THashMap<std::pair<TString, DatabaseType>, TEndpoint> DatabaseId2Endpoint;
+ bool Success = false;
+ NYql::TIssues Issues;
+};
class IDatabaseAsyncResolver {
public:
- virtual NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(
- const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const = 0;
+ virtual NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds(
+ const THashMap<std::pair<TString, DatabaseType>, NYql::TDatabaseAuth>& ids) const = 0;
virtual ~IDatabaseAsyncResolver() = default;
};
-} // NYq
+} // NYql
diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/ya.make b/ydb/library/yql/providers/common/db_id_async_resolver/ya.make
index 659fb003039..7d9b2937336 100644
--- a/ydb/library/yql/providers/common/db_id_async_resolver/ya.make
+++ b/ydb/library/yql/providers/common/db_id_async_resolver/ya.make
@@ -3,13 +3,12 @@ OWNER(g:yq)
LIBRARY()
SRCS(
- db_async_resolver_impl.cpp
+ db_async_resolver.h
)
PEERDIR(
library/cpp/threading/future
- ydb/core/yq/libs/events
- ydb/library/yql/providers/dq/actors
+ ydb/library/yql/public/issue
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp
index 349798c9310..e73655b6c26 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp
@@ -12,7 +12,7 @@ namespace {
using namespace NNodes;
class TPqDataSinkIODiscoveryTransformer : public TGraphTransformerBase {
-using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>;
+using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>;
public:
explicit TPqDataSinkIODiscoveryTransformer(TPqState::TPtr state)
: State_(state)
@@ -28,13 +28,13 @@ public:
if (!State_->DbResolver)
return TStatus::Ok;
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> ids;
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids;
FindYdsDbIdsForResolving(State_, input, ids);
if (ids.empty())
return TStatus::Ok;
- const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_;
+ const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_;
AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future)
{
if (const auto res = response.lock())
@@ -55,7 +55,7 @@ public:
return TStatus::Error;
}
FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end());
- DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
+ DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
FillSettingsWithResolvedYdsIds(State_, FullResolvedIds_);
return TStatus::Ok;
}
@@ -64,7 +64,7 @@ private:
const TPqState::TPtr State_;
NThreading::TFuture<void> AsyncFuture_;
TDbId2Endpoint FullResolvedIds_;
- std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
+ std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
};
}
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp
index 6b3590ef045..7bc56eef366 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp
@@ -29,7 +29,7 @@ TCoNameValueTupleList BuildTopicPropsList(const TPqState::TTopicMeta& meta, TPos
void FindYdsDbIdsForResolving(
const TPqState::TPtr& state,
TExprNode::TPtr input,
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& ids)
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids)
{
if (auto pqNodes = FindNodes(input, [&](const TExprNode::TPtr& node) {
if (auto maybePqRead = TMaybeNode<TPqRead>(node)) {
@@ -67,7 +67,7 @@ void FindYdsDbIdsForResolving(
if (!foundSetting->second.DatabaseId)
continue;
YQL_CLOG(INFO, ProviderPq) << "Resolve YDS id: " << foundSetting->second.DatabaseId;
- const auto idKey = std::make_pair(foundSetting->second.DatabaseId, NYq::DatabaseType::DataStreams);
+ const auto idKey = std::make_pair(foundSetting->second.DatabaseId, NYql::DatabaseType::DataStreams);
const auto foundDbId = state->DatabaseIds.find(idKey);
if (foundDbId != state->DatabaseIds.end()) {
ids[idKey] = foundDbId->second;
@@ -79,7 +79,7 @@ void FindYdsDbIdsForResolving(
void FillSettingsWithResolvedYdsIds(
const TPqState::TPtr& state,
- const THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>& fullResolvedIds)
+ const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>& fullResolvedIds)
{
YQL_CLOG(INFO, ProviderPq) << "FullResolvedIds size: " << fullResolvedIds.size();
auto& clusters = state->Configuration->ClustersConfigurationSettings;
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h
index f1f808fe315..7804e63a34c 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h
@@ -15,10 +15,10 @@ NNodes::TCoNameValueTupleList BuildTopicPropsList(const TPqState::TTopicMeta& me
void FindYdsDbIdsForResolving(
const TPqState::TPtr& state,
TExprNode::TPtr input,
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& ids);
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids);
void FillSettingsWithResolvedYdsIds(
const TPqState::TPtr& state,
- const THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>& fullResolvedIds);
+ const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>& fullResolvedIds);
} // namespace NYql
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp
index ef26fc7a82f..b71a33474ea 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp
@@ -13,7 +13,7 @@ using namespace NNodes;
class TPqIODiscoveryTransformer : public TGraphTransformerBase {
-using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>;
+using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>;
public:
explicit TPqIODiscoveryTransformer(TPqState::TPtr state)
@@ -30,13 +30,13 @@ public:
if (!State_->DbResolver)
return TStatus::Ok;
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> ids;
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids;
FindYdsDbIdsForResolving(State_, input, ids);
if (ids.empty())
return TStatus::Ok;
- const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_;
+ const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_;
AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future)
{
if (const auto res = response.lock())
@@ -57,7 +57,7 @@ public:
return TStatus::Error;
}
FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end());
- DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
+ DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
FillSettingsWithResolvedYdsIds(State_, FullResolvedIds_);
return TStatus::Ok;
}
@@ -66,7 +66,7 @@ private:
const TPqState::TPtr State_;
NThreading::TFuture<void> AsyncFuture_;
TDbId2Endpoint FullResolvedIds_;
- std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
+ std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
};
}
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp
index 5b51d4c0f8c..d7f6cc3828d 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp
@@ -12,7 +12,7 @@ namespace NYql {
TDataProviderInitializer GetPqDataProviderInitializer(
IPqGateway::TPtr gateway,
bool supportRtmrMode,
- std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver) {
+ std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver) {
return [gateway, supportRtmrMode, dbResolver] (
const TString& userName,
const TString& sessionId,
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h
index 7e2e39b746c..bc9b4872c28 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h
@@ -52,14 +52,14 @@ public:
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
IPqGateway::TPtr Gateway;
THolder<IDqIntegration> DqIntegration;
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> DatabaseIds;
- std::shared_ptr<NYq::IDatabaseAsyncResolver> DbResolver;
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds;
+ std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver;
};
TDataProviderInitializer GetPqDataProviderInitializer(
IPqGateway::TPtr gateway,
bool supportRtmrMode = false,
- std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver = nullptr
+ std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr
);
} // namespace NYql
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp
index e4dead34134..c44b0ce71f4 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp
@@ -17,8 +17,8 @@ TPqSettings::TConstPtr TPqConfiguration::Snapshot() const {
void TPqConfiguration::Init(
const TPqGatewayConfig& config,
TIntrusivePtr<TTypeAnnotationContext> typeCtx,
- const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver,
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds)
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds)
{
TVector<TString> clusters(Reserve(config.ClusterMappingSize()));
for (auto& cluster: config.GetClusterMapping()) {
@@ -52,8 +52,8 @@ void TPqConfiguration::Init(
YQL_CLOG(DEBUG, ProviderPq) << "Settings: clusterName = " << cluster.GetName()
<< ", clusterDbId = " << cluster.GetDatabaseId() << ", cluster.GetEndpoint(): " << cluster.GetEndpoint() << ", HasEndpoint = " << (cluster.HasEndpoint() ? "TRUE" : "FALSE") ;
if (cluster.GetDatabaseId()) {
- databaseIds[std::make_pair(cluster.GetDatabaseId(), NYq::DatabaseType::DataStreams)] =
- NYq::TEvents::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()};
+ databaseIds[std::make_pair(cluster.GetDatabaseId(), NYql::DatabaseType::DataStreams)] =
+ NYql::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()};
DbId2Clusters[cluster.GetDatabaseId()].emplace_back(cluster.GetName());
YQL_CLOG(DEBUG, ProviderPq) << "Add dbId: " << cluster.GetDatabaseId() << " to DbId2Clusters";
}
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h
index 1337d7dac24..382af9c82b7 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h
@@ -6,7 +6,6 @@
#include <ydb/library/yql/providers/common/config/yql_setting.h>
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
-#include <ydb/core/yq/libs/events/events.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
namespace NYql {
@@ -41,8 +40,8 @@ struct TPqConfiguration : public TPqSettings, public NCommon::TSettingDispatcher
void Init(
const TPqGatewayConfig& config,
TIntrusivePtr<TTypeAnnotationContext> typeCtx,
- const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver,
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds);
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds);
TString GetDatabaseForTopic(const TString& cluster) const;
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp
index ff0c5fd5087..36e9890a55b 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp
@@ -1,5 +1,4 @@
#include "yql_ydb_provider_impl.h"
-#include <ydb/core/yq/libs/events/events.h>
#include <ydb/library/yql/providers/ydb/expr_nodes/yql_ydb_expr_nodes.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
@@ -10,7 +9,6 @@
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/public/udf/udf_types.h>
#include <ydb/library/yql/ast/yql_expr.h>
-#include <ydb/core/yq/libs/events/events.h>
namespace NYql {
@@ -19,7 +17,7 @@ namespace {
using namespace NNodes;
class TYdbIODiscoveryTransformer : public TGraphTransformerBase {
-using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>;
+using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>;
public:
TYdbIODiscoveryTransformer(TYdbState::TPtr state)
: State_(std::move(state))
@@ -34,7 +32,7 @@ public:
if (!State_->DbResolver)
return TStatus::Ok;
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> ids;
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids;
if (auto reads = FindNodes(input, [&](const TExprNode::TPtr& node) {
const TExprBase nodeExpr(node);
if (!nodeExpr.Maybe<TYdbRead>())
@@ -49,7 +47,7 @@ public:
const TYdbRead read(node);
const auto& cluster = read.DataSource().Cluster().StringValue();
const auto& dbId = State_->Configuration->Clusters[cluster].DatabaseId;
- const auto idKey = std::make_pair(dbId, NYq::DatabaseType::Ydb);
+ const auto idKey = std::make_pair(dbId, NYql::DatabaseType::Ydb);
const auto iter = State_->DatabaseIds.find(idKey);
if (iter != State_->DatabaseIds.end()) {
ids[idKey] = iter->second;
@@ -59,7 +57,7 @@ public:
if (ids.empty()) {
return TStatus::Ok;
}
- const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_;
+ const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_;
AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future)
{
if (const auto res = response.lock())
@@ -80,7 +78,7 @@ public:
return TStatus::Error;
}
FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end());
- DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
+ DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
auto& clusters = State_->Configuration->Clusters;
const auto& id2Clusters = State_->Configuration->DbId2Clusters;
for (const auto& [dbIdWithType, info] : FullResolvedIds_) {
@@ -103,7 +101,7 @@ private:
NThreading::TFuture<void> AsyncFuture_;
TDbId2Endpoint FullResolvedIds_;
- std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
+ std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
};
}
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp
index 11aa5060a22..8bb64063b2d 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp
@@ -7,7 +7,7 @@ namespace NYql {
TDataProviderInitializer GetYdbDataProviderInitializer(
NYdb::TDriver driver,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver) {
+ std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver) {
return [driver, credentialsFactory, dbResolver] (
const TString& userName,
const TString& sessionId,
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h
index 56cd6e93314..1135c62753a 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h
@@ -33,14 +33,14 @@ struct TYdbState : public TThrRefBase
TYdbConfiguration::TPtr Configuration = MakeIntrusive<TYdbConfiguration>();
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> DatabaseIds;
- std::shared_ptr<NYq::IDatabaseAsyncResolver> DbResolver;
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds;
+ std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver;
};
TDataProviderInitializer GetYdbDataProviderInitializer(
NYdb::TDriver driver,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr,
- const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver = nullptr
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr
);
TIntrusivePtr<IDataProvider> CreateYdbDataSource(
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp
index a582395c3c6..b31d717997d 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp
@@ -20,8 +20,8 @@ bool TYdbConfiguration::HasCluster(TStringBuf cluster) const {
void TYdbConfiguration::Init(
const TYdbGatewayConfig& config,
TIntrusivePtr<TTypeAnnotationContext> typeCtx,
- const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver,
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds)
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds)
{
TVector<TString> clusters(Reserve(config.ClusterMappingSize()));
for (auto& cluster: config.GetClusterMapping()) {
@@ -43,8 +43,8 @@ void TYdbConfiguration::Init(
if (dbResolver) {
if (cluster.GetId()) {
- databaseIds[std::make_pair(cluster.GetId(), NYq::DatabaseType::Ydb)] =
- NYq::TEvents::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()};
+ databaseIds[std::make_pair(cluster.GetId(), NYql::DatabaseType::Ydb)] =
+ NYql::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()};
DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName());
}
}
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h
index 7a27452cc59..1d72a52cd34 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h
@@ -5,7 +5,6 @@
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
-#include <ydb/core/yq/libs/events/events.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
@@ -34,8 +33,8 @@ struct TYdbConfiguration : public TYdbSettings, public NCommon::TSettingDispatch
void Init(
const TYdbGatewayConfig& config,
TIntrusivePtr<TTypeAnnotationContext> typeCtx,
- const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver,
- THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds
+ const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
+ THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds
);
bool HasCluster(TStringBuf cluster) const;