diff options
author | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-02-16 14:46:33 +0300 |
---|---|---|
committer | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-02-16 14:46:33 +0300 |
commit | f8105a7f3a524325d504a2b519f8f40fc109f6bb (patch) | |
tree | 17f6a691f17cf2e7ee011a8031760f1a8e0302c7 | |
parent | 2f8ea7e2648c1624b5c1af79efe506096c4ce519 (diff) | |
download | ydb-f8105a7f3a524325d504a2b519f8f40fc109f6bb.tar.gz |
YQ-831 Cut dependecy: YQL --> YQ
Fixed deps
ref:09c66d3cc6f83868fa99039a01af629e24778a56
30 files changed, 167 insertions, 216 deletions
diff --git a/ydb/core/yq/libs/actors/database_resolver.cpp b/ydb/core/yq/libs/actors/database_resolver.cpp index 28402cb7d80..59558c22d2d 100644 --- a/ydb/core/yq/libs/actors/database_resolver.cpp +++ b/ydb/core/yq/libs/actors/database_resolver.cpp @@ -2,6 +2,7 @@ #include <ydb/core/yq/libs/events/events.h> #include <ydb/core/yq/libs/common/cache.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/http/http.h> @@ -9,6 +10,7 @@ #include <library/cpp/json/json_reader.h> #include <ydb/core/protos/services.pb.h> + #define LOG_E(stream) \ LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "DatabaseResolver - TraceId: " << TraceId << ": " << stream) @@ -20,10 +22,10 @@ namespace NYq { using namespace NActors; using namespace NYql; -using TEndpoint = TEvents::TDbResolverResponse::TEndpoint; +using TEndpoint = NYql::TDbResolverResponse::TEndpoint; -using TParsers = THashMap<DatabaseType, std::function<TEndpoint(NJson::TJsonValue& body, bool)>>; -using TCache = TTtlCache<std::tuple<TString, DatabaseType, TEvents::TDatabaseAuth>, std::variant<TEndpoint, TString>>; +using TParsers = THashMap<NYql::DatabaseType, std::function<TEndpoint(NJson::TJsonValue& body, bool)>>; +using TCache = TTtlCache<std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>, std::variant<TEndpoint, TString>>; TString TransformMdbHostToCorrectFormat(const TString& mdbHost) { return mdbHost.substr(0, mdbHost.find('.')) + ".db.yandex.net:8443"; @@ -39,8 +41,8 @@ public: TResponseProcessor( const TActorId sender, TCache& cache, - const THashMap<std::pair<TString, DatabaseType>, TEndpoint>& ready, - const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, DatabaseType, TEvents::TDatabaseAuth>>& requests, + const THashMap<std::pair<TString, NYql::DatabaseType>, TEndpoint>& ready, + const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>>& requests, const TString& traceId, bool mdbTransformHost, const TParsers& parsers) @@ -103,7 +105,7 @@ private: } Send(Sender, new TEvents::TEvEndpointResponse( - TEvents::TDbResolverResponse(std::move(DatabaseId2Endpoint), Success, issues))); + NYql::TDbResolverResponse(std::move(DatabaseId2Endpoint), Success, issues))); PassAway(); } @@ -112,8 +114,8 @@ private: TString status; TString errorMessage; TString databaseId; - DatabaseType databaseType = DatabaseType::Ydb; - TEvents::TDatabaseAuth info; + NYql::DatabaseType databaseType = DatabaseType::Ydb; + NYql::TDatabaseAuth info; TMaybe<TEndpoint> result; HandledIds++; if (ev->Get()->Error.empty() && (ev->Get()->Response && ((status = ev->Get()->Response->Status) == "200"))) { @@ -131,20 +133,23 @@ private: if (parseJsonOk && (parserIt = Parsers.find(databaseType)) != Parsers.end()) { try { auto res = parserIt->second(databaseInfo, MdbTransformHost); - LOG_D("Got " << databaseId << " " << databaseType << " endpoint " << res.Endpoint << " " << res.Database); + LOG_D("Got db_id: " << databaseId + << ", db type: " << static_cast<std::underlying_type<NYql::DatabaseType>::type>(databaseType) + << ", endpoint: " << res.Endpoint + << ", database: " << res.Database); DatabaseId2Endpoint[std::make_pair(databaseId, databaseType)] = res; result.ConstructInPlace(res); } catch (...) { errorMessage = TStringBuilder() << " Couldn't resolve " - << databaseType << " Id: " - << databaseId << "\n" + << "databaseId: " << databaseId + << ", db type: " << static_cast<std::underlying_type<NYql::DatabaseType>::type>(databaseType) << "\n" << CurrentExceptionMessage(); } } else { errorMessage = TStringBuilder() << "Unable to parse database information." << "Database Id: " << databaseId - << ", Database Type: " << databaseType; + << ", db type: " << static_cast<std::underlying_type<NYql::DatabaseType>::type>(databaseType); } } } else { @@ -182,10 +187,10 @@ private: private: const TActorId Sender; TCache& Cache; - const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, DatabaseType, TEvents::TDatabaseAuth>> Requests; + const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>> Requests; const TString TraceId; const bool MdbTransformHost; - THashMap<std::pair<TString, DatabaseType>, TEndpoint> DatabaseId2Endpoint; + THashMap<std::pair<TString, NYql::DatabaseType>, TEndpoint> DatabaseId2Endpoint; size_t HandledIds = 0; bool Success = true; const TParsers& Parsers; @@ -226,8 +231,8 @@ public: Y_ENSURE(endpoint); return TEndpoint{endpoint, database, secure}; }; - Parsers[DatabaseType::Ydb] = ydbParser; - Parsers[DatabaseType::DataStreams] = [ydbParser](NJson::TJsonValue& databaseInfo, bool mdbTransformHost) + Parsers[NYql::DatabaseType::Ydb] = ydbParser; + Parsers[NYql::DatabaseType::DataStreams] = [ydbParser](NJson::TJsonValue& databaseInfo, bool mdbTransformHost) { auto ret = ydbParser(databaseInfo, mdbTransformHost); // TODO: Take explicit field from MVP @@ -237,7 +242,7 @@ public: } return ret; }; - Parsers[DatabaseType::ClickHouse] = [](NJson::TJsonValue& databaseInfo, bool mdbTransformHost) { + Parsers[NYql::DatabaseType::ClickHouse] = [](NJson::TJsonValue& databaseInfo, bool mdbTransformHost) { TString endpoint; TVector<TString> aliveHosts; for (const auto& host : databaseInfo.GetMap().at("hosts").GetArraySafe()) { @@ -266,7 +271,7 @@ private: void SendResponse( const NActors::TActorId& recipient, - THashMap<std::pair<TString, DatabaseType>, TEndpoint>&& ready, + THashMap<std::pair<TString, NYql::DatabaseType>, TEndpoint>&& ready, bool success = true, const TString& errorMessage = "") { @@ -275,15 +280,15 @@ private: if (errorMessage) issues.AddIssue(errorMessage); Send(recipient, - new TEvents::TEvEndpointResponse(TEvents::TDbResolverResponse{std::move(ready), success, issues})); + new TEvents::TEvEndpointResponse(NYql::TDbResolverResponse{std::move(ready), success, issues})); } void Handle(TEvents::TEvEndpointRequest::TPtr ev, const TActorContext& ctx) { TraceId = ev->Get()->TraceId; LOG_D("Start databaseId resolver for " << ev->Get()->DatabaseIds.size() << " ids"); - THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, DatabaseType, TEvents::TDatabaseAuth>> requests; // request, (dbId, type, info) - THashMap<std::pair<TString, DatabaseType>, TEndpoint> ready; + THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, NYql::DatabaseType, NYql::TDatabaseAuth>> requests; // request, (dbId, type, info) + THashMap<std::pair<TString, NYql::DatabaseType>, TEndpoint> ready; for (const auto& [p, info] : ev->Get()->DatabaseIds) { const auto& [databaseId, type] = p; TMaybe<std::variant<TEndpoint, TString>> cacheVal; @@ -306,7 +311,7 @@ private: } try { - TString url = IsIn({ DatabaseType::Ydb, DatabaseType::DataStreams }, type) ? + TString url = IsIn({NYql::DatabaseType::Ydb, NYql::DatabaseType::DataStreams }, type) ? ev->Get()->YdbMvpEndpoint + "/database?databaseId=" + databaseId : ev->Get()->MdbGateway + "/managed-clickhouse/v1/clusters/" + databaseId + "/hosts"; LOG_D("Get '" << url << "'"); diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 0ca3332432e..2643150d88f 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -51,6 +51,7 @@ #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/log.h> #include <ydb/core/yq/libs/common/entity_id.h> +#include <ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h> #include <ydb/core/yq/libs/actors/nodes_manager.h> #include <ydb/core/yq/libs/gateway/empty_gateway.h> #include <ydb/core/yq/libs/read_rule/read_rule_creator.h> @@ -68,7 +69,6 @@ #include <ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h> #include <ydb/core/yq/libs/checkpointing_common/defs.h> #include <ydb/core/yq/libs/checkpoint_storage/storage_service.h> -#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h> #include <ydb/core/yq/libs/private_client/private_client.h> #define LOG_E(stream) \ diff --git a/ydb/core/yq/libs/actors/ya.make b/ydb/core/yq/libs/actors/ya.make index 78d6d09aca5..15edfaccf4e 100644 --- a/ydb/core/yq/libs/actors/ya.make +++ b/ydb/core/yq/libs/actors/ya.make @@ -36,6 +36,7 @@ PEERDIR( ydb/core/yq/libs/actors/logging ydb/core/yq/libs/checkpointing ydb/core/yq/libs/checkpointing_common + ydb/core/yq/libs/db_id_async_resolver_impl ydb/core/yq/libs/common ydb/core/yq/libs/control_plane_storage ydb/core/yq/libs/control_plane_storage/events diff --git a/ydb/core/yq/libs/common/database_token_builder.cpp b/ydb/core/yq/libs/common/database_token_builder.cpp deleted file mode 100644 index 89d36cfba6f..00000000000 --- a/ydb/core/yq/libs/common/database_token_builder.cpp +++ /dev/null @@ -1,45 +0,0 @@ -#include "database_token_builder.h" -#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h> - -namespace NYq { - -using namespace NYql; - - TString BuildStructuredToken(const YandexQuery::IamAuth& auth, const TString& authToken, const THashMap<TString, TString>& accountIdSignatures) { - TStructuredTokenBuilder result; - switch (auth.identity_case()) { - case YandexQuery::IamAuth::kCurrentIam: - result.SetIAMToken(authToken); - break; - case YandexQuery::IamAuth::kServiceAccount: { - const auto& signature = accountIdSignatures.at(auth.service_account().id()); - result.SetServiceAccountIdAuth(auth.service_account().id(), signature); - break; - } - default: - result.SetNoAuth(); - break; - } - - return result.ToJson(); - } - - void TryAddDatabaseToResolve( - const YandexQuery::IamAuth& auth, - const TString& databaseId, - DatabaseType type, - const TString& authToken, - const THashMap<TString, TString>& accountIdSignatures, - THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& databaseIds) { - if (!databaseId) { - return; - } - - TEvents::TDatabaseAuth info; - info.StructuredToken = BuildStructuredToken(auth, authToken, accountIdSignatures); - info.AddBearerToToken = true; // XXX - databaseIds[std::make_pair(databaseId, type)] = info; - } - -} // NYq - diff --git a/ydb/core/yq/libs/common/database_token_builder.h b/ydb/core/yq/libs/common/database_token_builder.h deleted file mode 100644 index a3e00efab5a..00000000000 --- a/ydb/core/yq/libs/common/database_token_builder.h +++ /dev/null @@ -1,24 +0,0 @@ -#pragma once - -#include <ydb/core/yq/libs/control_plane_storage/events/events.h> -#include <ydb/core/yq/libs/events/events.h> - -namespace NYq { - -using namespace NActors; -using namespace NYql; - - TString BuildStructuredToken( - const YandexQuery::IamAuth& auth, - const TString& authToken, - const THashMap<TString, TString>& accountIdSignatures); - - void TryAddDatabaseToResolve( - const YandexQuery::IamAuth& auth, - const TString& databaseId, - DatabaseType type, - const TString& authToken, - const THashMap<TString, TString>& accountIdSignatures, - THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& databaseIds); - -} // NYq diff --git a/ydb/core/yq/libs/common/ya.make b/ydb/core/yq/libs/common/ya.make index 4622695700a..d2527b63473 100644 --- a/ydb/core/yq/libs/common/ya.make +++ b/ydb/core/yq/libs/common/ya.make @@ -3,7 +3,6 @@ OWNER(g:yq) LIBRARY() SRCS( - database_token_builder.cpp entity_id.cpp rows_proto_splitter.cpp ) diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp b/ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp index e13093b4637..242fcf86c93 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.cpp +++ b/ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.cpp @@ -18,10 +18,10 @@ TDatabaseAsyncResolverImpl::TDatabaseAsyncResolverImpl( , TraceId(traceId) {} -TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds( - const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const +TFuture<NYql::TDbResolverResponse> TDatabaseAsyncResolverImpl::ResolveIds( + const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids) const { - auto promise = NewPromise<TEvents::TDbResolverResponse>(); + auto promise = NewPromise<NYql::TDbResolverResponse>(); TDuration timeout = TDuration::Seconds(40); auto callback = MakeHolder<NYql::TRichActorFutureCallback<TEvents::TEvEndpointResponse>>( [promise] (TAutoPtr<NActors::TEventHandle<TEvents::TEvEndpointResponse>>& event) mutable { diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h b/ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h index 10ca0c956e4..22c248d6076 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver_impl.h +++ b/ydb/core/yq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h @@ -1,10 +1,11 @@ #pragma once -#include "db_async_resolver.h" +#include <ydb/core/yq/libs/events/events.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> #include <ydb/library/yql/providers/dq/actors/actor_helpers.h> namespace NYq { -class TDatabaseAsyncResolverImpl : public IDatabaseAsyncResolver { +class TDatabaseAsyncResolverImpl : public NYql::IDatabaseAsyncResolver { public: TDatabaseAsyncResolverImpl( NActors::TActorSystem* actorSystem, @@ -15,8 +16,8 @@ public: const TString& traceId = "" ); - NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds( - const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const override; + NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds( + const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids) const override; private: NActors::TActorSystem* ActorSystem; const NActors::TActorId Recipient; diff --git a/ydb/core/yq/libs/db_id_async_resolver_impl/ya.make b/ydb/core/yq/libs/db_id_async_resolver_impl/ya.make new file mode 100644 index 00000000000..505c600686b --- /dev/null +++ b/ydb/core/yq/libs/db_id_async_resolver_impl/ya.make @@ -0,0 +1,18 @@ +OWNER(g:yq) + +LIBRARY() + +SRCS( + db_async_resolver_impl.cpp +) + +PEERDIR( + library/cpp/threading/future + ydb/core/yq/libs/events + ydb/library/yql/providers/common/db_id_async_resolver + ydb/library/yql/providers/dq/actors +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h index 36c864dca8d..0ba106b33aa 100644 --- a/ydb/core/yq/libs/events/events.h +++ b/ydb/core/yq/libs/events/events.h @@ -2,6 +2,7 @@ #include "event_ids.h" #include <ydb/library/yql/core/facade/yql_facade.h> +#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> #include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -18,13 +19,6 @@ namespace NYq { using NYdb::NYq::TScope; -enum class DatabaseType { - Ydb, - ClickHouse, - DataStreams, - ObjectStorage -}; - struct TQueryResult { TVector<Ydb::ResultSet> Sets; TInstant ExpirationDeadline; @@ -126,54 +120,21 @@ struct TEvents { {} }; - struct TDbResolverResponse { - struct TEndpoint { - TString Endpoint; - TString Database; - bool Secure = false; - }; - - TDbResolverResponse() = default; - - TDbResolverResponse( - THashMap<std::pair<TString, DatabaseType>, TEndpoint>&& databaseId2Endpoint, - bool success = false, - const NYql::TIssues& issues = {}) - : DatabaseId2Endpoint(databaseId2Endpoint) - , Success(success) - , Issues(issues) {} - - THashMap<std::pair<TString, DatabaseType>, TEndpoint> DatabaseId2Endpoint; - bool Success = false; - NYql::TIssues Issues; - }; - struct TEvEndpointResponse : NActors::TEventLocal<TEvEndpointResponse, TEventIds::EvEndpointResponse> { - TDbResolverResponse DbResolverResponse; - explicit TEvEndpointResponse(TDbResolverResponse&& response) noexcept : DbResolverResponse(std::move(response)) {} + NYql::TDbResolverResponse DbResolverResponse; + explicit TEvEndpointResponse(NYql::TDbResolverResponse&& response) noexcept : DbResolverResponse(std::move(response)) {} }; - struct TDatabaseAuth { - TString StructuredToken; - bool AddBearerToToken = false; - - bool operator==(const TDatabaseAuth& other) const { - return std::tie(StructuredToken, AddBearerToToken) == std::tie(other.StructuredToken, other.AddBearerToToken); - } - bool operator!=(const TDatabaseAuth& other) const { - return !(*this == other); - } - }; struct TEvEndpointRequest : NActors::TEventLocal<TEvEndpointRequest, TEventIds::EvEndpointRequest> { - THashMap<std::pair<TString, DatabaseType>, TDatabaseAuth> DatabaseIds; // DbId, DatabaseType => database auth + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds; // DbId, DatabaseType => database auth TString YdbMvpEndpoint; TString MdbGateway; TString TraceId; bool MdbTransformHost; TEvEndpointRequest( - const THashMap<std::pair<TString, DatabaseType>, TDatabaseAuth>& databaseIds, + const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds, const TString& ydbMvpEndpoint, const TString& mdbGateway, const TString& traceId, @@ -193,6 +154,7 @@ struct TEvents { } NYql::TIssues Issues; + NYql::TIssues TransientIssues; }; struct TEvDataStreamsReadRulesDeletionResult : NActors::TEventLocal<TEvDataStreamsReadRulesDeletionResult, TEventIds::EvDataStreamsReadRulesDeletionResult> { @@ -261,8 +223,8 @@ struct TEvents { } // namespace NYq template<> -struct THash<NYq::TEvents::TDatabaseAuth> { - inline ui64 operator()(const NYq::TEvents::TDatabaseAuth& x) const noexcept { +struct THash<NYql::TDatabaseAuth> { + inline ui64 operator()(const NYql::TDatabaseAuth& x) const noexcept { return MultiHash(x.StructuredToken, x.AddBearerToToken); } }; diff --git a/ydb/core/yq/libs/events/ya.make b/ydb/core/yq/libs/events/ya.make index c12bdd681f4..a8310348683 100644 --- a/ydb/core/yq/libs/events/ya.make +++ b/ydb/core/yq/libs/events/ya.make @@ -14,6 +14,7 @@ PEERDIR( ydb/public/api/protos ydb/public/lib/yq ydb/public/sdk/cpp/client/ydb_table + ydb/library/yql/providers/common/db_id_async_resolver ydb/library/yql/providers/dq/provider ) diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp index 704431b1fb0..ed5db76023f 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp @@ -6,7 +6,6 @@ #include <ydb/library/yql/core/yql_graph_transformer.h> #include <ydb/library/yql/ast/yql_expr.h> -#include <ydb/core/yq/libs/events/events.h> namespace NYql { @@ -16,7 +15,7 @@ using namespace NNodes; class TClickHouseIODiscoveryTransformer : public TGraphTransformerBase { -using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>; +using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>; public: TClickHouseIODiscoveryTransformer(TClickHouseState::TPtr state) @@ -32,7 +31,7 @@ public: if (!State_->DbResolver) return TStatus::Ok; - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> ids; + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids; if (auto reads = FindNodes(input, [&](const TExprNode::TPtr& node) { const TExprBase nodeExpr(node); if (!nodeExpr.Maybe<TClRead>()) @@ -51,7 +50,7 @@ public: auto dbId = State_->Configuration->Endpoints[cluster].first; dbId = dbId.substr(0, dbId.find(':')); YQL_CLOG(DEBUG, ProviderClickHouse) << "Found dbId: " << dbId; - const auto idKey = std::make_pair(dbId, NYq::DatabaseType::ClickHouse); + const auto idKey = std::make_pair(dbId, NYql::DatabaseType::ClickHouse); const auto iter = State_->DatabaseIds.find(idKey); if (iter != State_->DatabaseIds.end()) { YQL_CLOG(DEBUG, ProviderClickHouse) << "Resolve CH id: " << dbId; @@ -65,7 +64,7 @@ public: return TStatus::Ok; } - const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_; + const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_; AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) *res = std::move(future.ExtractValue()); @@ -85,7 +84,7 @@ public: return TStatus::Error; } FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end()); - DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); + DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); YQL_CLOG(DEBUG, ProviderClickHouse) << "ResolvedIds: " << FullResolvedIds_.size(); auto& endpoints = State_->Configuration->Endpoints; const auto& id2Clusters = State_->Configuration->DbId2Clusters; @@ -115,7 +114,7 @@ private: NThreading::TFuture<void> AsyncFuture_; TDbId2Endpoint FullResolvedIds_; - std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); + std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); }; } diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp index 3cdfb17866e..3f309849333 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.cpp @@ -6,7 +6,7 @@ namespace NYql { TDataProviderInitializer GetClickHouseDataProviderInitializer( IHTTPGateway::TPtr gateway, - const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver) + const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver) { return [gateway, dbResolver] ( const TString& userName, diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h index 17ebb38b7b3..849ed534ed8 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_provider.h @@ -27,13 +27,13 @@ struct TClickHouseState : public TThrRefBase TTypeAnnotationContext* Types = nullptr; TClickHouseConfiguration::TPtr Configuration = MakeIntrusive<TClickHouseConfiguration>(); const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> DatabaseIds; - std::shared_ptr<NYq::IDatabaseAsyncResolver> DbResolver; + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds; + std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver; }; TDataProviderInitializer GetClickHouseDataProviderInitializer( IHTTPGateway::TPtr gateway, - std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver = nullptr + std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr ); TIntrusivePtr<IDataProvider> CreateClickHouseDataSource(TClickHouseState::TPtr state, IHTTPGateway::TPtr gateway); diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h index c206844b4f9..bcf1acd4f2d 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_settings.h @@ -6,8 +6,6 @@ #include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> -#include <ydb/core/yq/libs/events/events.h> - namespace NYql { @@ -24,8 +22,8 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS template <typename TProtoConfig> void Init( const TProtoConfig& config, - const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver, - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds) + const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds) { TVector<TString> clusters(Reserve(config.ClusterMappingSize())); for (auto& cluster: config.GetClusterMapping()) { @@ -41,8 +39,8 @@ struct TClickHouseConfiguration : public TClickHouseSettings, public NCommon::TS if (dbResolver) { YQL_CLOG(DEBUG, ProviderClickHouse) << "Settings: clusterName = " << cluster.GetName() << ", clusterDbId = " << cluster.GetId() << ", cluster.GetCluster(): " << cluster.GetCluster() << ", HasCluster: " << (cluster.HasCluster() ? "TRUE" : "FALSE") ; - databaseIds[std::make_pair(cluster.GetId(), NYq::DatabaseType::ClickHouse)] = - NYq::TEvents::TDatabaseAuth{cluster.GetCHToken(), /*AddBearer=*/false}; + databaseIds[std::make_pair(cluster.GetId(), NYql::DatabaseType::ClickHouse)] = + NYql::TDatabaseAuth{cluster.GetCHToken(), /*AddBearer=*/false}; if (cluster.GetId()) { DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName()); YQL_CLOG(DEBUG, ProviderClickHouse) << "Add dbId: " << cluster.GetId() << " to DbId2Clusters"; diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h index 733bed54b02..bf4983ad13f 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h +++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h @@ -1,16 +1,57 @@ #pragma once +#include <ydb/library/yql/public/issue/yql_issue.h> + #include <library/cpp/threading/future/future.h> -#include <ydb/core/yq/libs/events/events.h> +namespace NYql { + +enum class DatabaseType { + Ydb, + ClickHouse, + DataStreams, + ObjectStorage +}; + +struct TDatabaseAuth { + TString StructuredToken; + bool AddBearerToToken = false; + + bool operator==(const TDatabaseAuth& other) const { + return std::tie(StructuredToken, AddBearerToToken) == std::tie(other.StructuredToken, other.AddBearerToToken); + } + bool operator!=(const TDatabaseAuth& other) const { + return !(*this == other); + } +}; + +struct TDbResolverResponse { + struct TEndpoint { + TString Endpoint; + TString Database; + bool Secure = false; + }; + + TDbResolverResponse() = default; -namespace NYq { + TDbResolverResponse( + THashMap<std::pair<TString, DatabaseType>, TEndpoint>&& databaseId2Endpoint, + bool success = false, + const NYql::TIssues& issues = {}) + : DatabaseId2Endpoint(std::move(databaseId2Endpoint)) + , Success(success) + , Issues(issues) {} + + THashMap<std::pair<TString, DatabaseType>, TEndpoint> DatabaseId2Endpoint; + bool Success = false; + NYql::TIssues Issues; +}; class IDatabaseAsyncResolver { public: - virtual NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds( - const THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth>& ids) const = 0; + virtual NThreading::TFuture<NYql::TDbResolverResponse> ResolveIds( + const THashMap<std::pair<TString, DatabaseType>, NYql::TDatabaseAuth>& ids) const = 0; virtual ~IDatabaseAsyncResolver() = default; }; -} // NYq +} // NYql diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/ya.make b/ydb/library/yql/providers/common/db_id_async_resolver/ya.make index 659fb003039..7d9b2937336 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/ya.make +++ b/ydb/library/yql/providers/common/db_id_async_resolver/ya.make @@ -3,13 +3,12 @@ OWNER(g:yq) LIBRARY() SRCS( - db_async_resolver_impl.cpp + db_async_resolver.h ) PEERDIR( library/cpp/threading/future - ydb/core/yq/libs/events - ydb/library/yql/providers/dq/actors + ydb/library/yql/public/issue ) YQL_LAST_ABI_VERSION() diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp index 349798c9310..e73655b6c26 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp @@ -12,7 +12,7 @@ namespace { using namespace NNodes; class TPqDataSinkIODiscoveryTransformer : public TGraphTransformerBase { -using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>; +using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>; public: explicit TPqDataSinkIODiscoveryTransformer(TPqState::TPtr state) : State_(state) @@ -28,13 +28,13 @@ public: if (!State_->DbResolver) return TStatus::Ok; - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> ids; + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids; FindYdsDbIdsForResolving(State_, input, ids); if (ids.empty()) return TStatus::Ok; - const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_; + const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_; AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) @@ -55,7 +55,7 @@ public: return TStatus::Error; } FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end()); - DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); + DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); FillSettingsWithResolvedYdsIds(State_, FullResolvedIds_); return TStatus::Ok; } @@ -64,7 +64,7 @@ private: const TPqState::TPtr State_; NThreading::TFuture<void> AsyncFuture_; TDbId2Endpoint FullResolvedIds_; - std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); + std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); }; } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp index 6b3590ef045..7bc56eef366 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp @@ -29,7 +29,7 @@ TCoNameValueTupleList BuildTopicPropsList(const TPqState::TTopicMeta& meta, TPos void FindYdsDbIdsForResolving( const TPqState::TPtr& state, TExprNode::TPtr input, - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& ids) + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids) { if (auto pqNodes = FindNodes(input, [&](const TExprNode::TPtr& node) { if (auto maybePqRead = TMaybeNode<TPqRead>(node)) { @@ -67,7 +67,7 @@ void FindYdsDbIdsForResolving( if (!foundSetting->second.DatabaseId) continue; YQL_CLOG(INFO, ProviderPq) << "Resolve YDS id: " << foundSetting->second.DatabaseId; - const auto idKey = std::make_pair(foundSetting->second.DatabaseId, NYq::DatabaseType::DataStreams); + const auto idKey = std::make_pair(foundSetting->second.DatabaseId, NYql::DatabaseType::DataStreams); const auto foundDbId = state->DatabaseIds.find(idKey); if (foundDbId != state->DatabaseIds.end()) { ids[idKey] = foundDbId->second; @@ -79,7 +79,7 @@ void FindYdsDbIdsForResolving( void FillSettingsWithResolvedYdsIds( const TPqState::TPtr& state, - const THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>& fullResolvedIds) + const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>& fullResolvedIds) { YQL_CLOG(INFO, ProviderPq) << "FullResolvedIds size: " << fullResolvedIds.size(); auto& clusters = state->Configuration->ClustersConfigurationSettings; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h index f1f808fe315..7804e63a34c 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h @@ -15,10 +15,10 @@ NNodes::TCoNameValueTupleList BuildTopicPropsList(const TPqState::TTopicMeta& me void FindYdsDbIdsForResolving( const TPqState::TPtr& state, TExprNode::TPtr input, - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& ids); + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& ids); void FillSettingsWithResolvedYdsIds( const TPqState::TPtr& state, - const THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>& fullResolvedIds); + const THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>& fullResolvedIds); } // namespace NYql diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp index ef26fc7a82f..b71a33474ea 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp @@ -13,7 +13,7 @@ using namespace NNodes; class TPqIODiscoveryTransformer : public TGraphTransformerBase { -using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>; +using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>; public: explicit TPqIODiscoveryTransformer(TPqState::TPtr state) @@ -30,13 +30,13 @@ public: if (!State_->DbResolver) return TStatus::Ok; - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> ids; + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids; FindYdsDbIdsForResolving(State_, input, ids); if (ids.empty()) return TStatus::Ok; - const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_; + const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_; AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) @@ -57,7 +57,7 @@ public: return TStatus::Error; } FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end()); - DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); + DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); FillSettingsWithResolvedYdsIds(State_, FullResolvedIds_); return TStatus::Ok; } @@ -66,7 +66,7 @@ private: const TPqState::TPtr State_; NThreading::TFuture<void> AsyncFuture_; TDbId2Endpoint FullResolvedIds_; - std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); + std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); }; } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp index 5b51d4c0f8c..d7f6cc3828d 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_provider.cpp @@ -12,7 +12,7 @@ namespace NYql { TDataProviderInitializer GetPqDataProviderInitializer( IPqGateway::TPtr gateway, bool supportRtmrMode, - std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver) { + std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver) { return [gateway, supportRtmrMode, dbResolver] ( const TString& userName, const TString& sessionId, diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h index 7e2e39b746c..bc9b4872c28 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_provider.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_provider.h @@ -52,14 +52,14 @@ public: const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; IPqGateway::TPtr Gateway; THolder<IDqIntegration> DqIntegration; - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> DatabaseIds; - std::shared_ptr<NYq::IDatabaseAsyncResolver> DbResolver; + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds; + std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver; }; TDataProviderInitializer GetPqDataProviderInitializer( IPqGateway::TPtr gateway, bool supportRtmrMode = false, - std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver = nullptr + std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr ); } // namespace NYql diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp index e4dead34134..c44b0ce71f4 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp @@ -17,8 +17,8 @@ TPqSettings::TConstPtr TPqConfiguration::Snapshot() const { void TPqConfiguration::Init( const TPqGatewayConfig& config, TIntrusivePtr<TTypeAnnotationContext> typeCtx, - const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver, - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds) + const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds) { TVector<TString> clusters(Reserve(config.ClusterMappingSize())); for (auto& cluster: config.GetClusterMapping()) { @@ -52,8 +52,8 @@ void TPqConfiguration::Init( YQL_CLOG(DEBUG, ProviderPq) << "Settings: clusterName = " << cluster.GetName() << ", clusterDbId = " << cluster.GetDatabaseId() << ", cluster.GetEndpoint(): " << cluster.GetEndpoint() << ", HasEndpoint = " << (cluster.HasEndpoint() ? "TRUE" : "FALSE") ; if (cluster.GetDatabaseId()) { - databaseIds[std::make_pair(cluster.GetDatabaseId(), NYq::DatabaseType::DataStreams)] = - NYq::TEvents::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()}; + databaseIds[std::make_pair(cluster.GetDatabaseId(), NYql::DatabaseType::DataStreams)] = + NYql::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()}; DbId2Clusters[cluster.GetDatabaseId()].emplace_back(cluster.GetName()); YQL_CLOG(DEBUG, ProviderPq) << "Add dbId: " << cluster.GetDatabaseId() << " to DbId2Clusters"; } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h index 1337d7dac24..382af9c82b7 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h @@ -6,7 +6,6 @@ #include <ydb/library/yql/providers/common/config/yql_setting.h> #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> -#include <ydb/core/yq/libs/events/events.h> #include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> namespace NYql { @@ -41,8 +40,8 @@ struct TPqConfiguration : public TPqSettings, public NCommon::TSettingDispatcher void Init( const TPqGatewayConfig& config, TIntrusivePtr<TTypeAnnotationContext> typeCtx, - const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver, - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds); + const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds); TString GetDatabaseForTopic(const TString& cluster) const; diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp index ff0c5fd5087..36e9890a55b 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp @@ -1,5 +1,4 @@ #include "yql_ydb_provider_impl.h" -#include <ydb/core/yq/libs/events/events.h> #include <ydb/library/yql/providers/ydb/expr_nodes/yql_ydb_expr_nodes.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> @@ -10,7 +9,6 @@ #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/public/udf/udf_types.h> #include <ydb/library/yql/ast/yql_expr.h> -#include <ydb/core/yq/libs/events/events.h> namespace NYql { @@ -19,7 +17,7 @@ namespace { using namespace NNodes; class TYdbIODiscoveryTransformer : public TGraphTransformerBase { -using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>; +using TDbId2Endpoint = THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDbResolverResponse::TEndpoint>; public: TYdbIODiscoveryTransformer(TYdbState::TPtr state) : State_(std::move(state)) @@ -34,7 +32,7 @@ public: if (!State_->DbResolver) return TStatus::Ok; - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> ids; + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> ids; if (auto reads = FindNodes(input, [&](const TExprNode::TPtr& node) { const TExprBase nodeExpr(node); if (!nodeExpr.Maybe<TYdbRead>()) @@ -49,7 +47,7 @@ public: const TYdbRead read(node); const auto& cluster = read.DataSource().Cluster().StringValue(); const auto& dbId = State_->Configuration->Clusters[cluster].DatabaseId; - const auto idKey = std::make_pair(dbId, NYq::DatabaseType::Ydb); + const auto idKey = std::make_pair(dbId, NYql::DatabaseType::Ydb); const auto iter = State_->DatabaseIds.find(idKey); if (iter != State_->DatabaseIds.end()) { ids[idKey] = iter->second; @@ -59,7 +57,7 @@ public: if (ids.empty()) { return TStatus::Ok; } - const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_; + const std::weak_ptr<NYql::TDbResolverResponse> response = DbResolverResponse_; AsyncFuture_ = State_->DbResolver->ResolveIds(ids).Apply([response](auto future) { if (const auto res = response.lock()) @@ -80,7 +78,7 @@ public: return TStatus::Error; } FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end()); - DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); + DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); auto& clusters = State_->Configuration->Clusters; const auto& id2Clusters = State_->Configuration->DbId2Clusters; for (const auto& [dbIdWithType, info] : FullResolvedIds_) { @@ -103,7 +101,7 @@ private: NThreading::TFuture<void> AsyncFuture_; TDbId2Endpoint FullResolvedIds_; - std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); + std::shared_ptr<NYql::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); }; } diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp index 11aa5060a22..8bb64063b2d 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.cpp @@ -7,7 +7,7 @@ namespace NYql { TDataProviderInitializer GetYdbDataProviderInitializer( NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver) { + std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver) { return [driver, credentialsFactory, dbResolver] ( const TString& userName, const TString& sessionId, diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h index 56cd6e93314..1135c62753a 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h @@ -33,14 +33,14 @@ struct TYdbState : public TThrRefBase TYdbConfiguration::TPtr Configuration = MakeIntrusive<TYdbConfiguration>(); const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth> DatabaseIds; - std::shared_ptr<NYq::IDatabaseAsyncResolver> DbResolver; + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth> DatabaseIds; + std::shared_ptr<NYql::IDatabaseAsyncResolver> DbResolver; }; TDataProviderInitializer GetYdbDataProviderInitializer( NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, - const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver = nullptr + const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver = nullptr ); TIntrusivePtr<IDataProvider> CreateYdbDataSource( diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp index a582395c3c6..b31d717997d 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.cpp @@ -20,8 +20,8 @@ bool TYdbConfiguration::HasCluster(TStringBuf cluster) const { void TYdbConfiguration::Init( const TYdbGatewayConfig& config, TIntrusivePtr<TTypeAnnotationContext> typeCtx, - const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver, - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds) + const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds) { TVector<TString> clusters(Reserve(config.ClusterMappingSize())); for (auto& cluster: config.GetClusterMapping()) { @@ -43,8 +43,8 @@ void TYdbConfiguration::Init( if (dbResolver) { if (cluster.GetId()) { - databaseIds[std::make_pair(cluster.GetId(), NYq::DatabaseType::Ydb)] = - NYq::TEvents::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()}; + databaseIds[std::make_pair(cluster.GetId(), NYql::DatabaseType::Ydb)] = + NYql::TDatabaseAuth{structuredTokenJson, cluster.GetAddBearerToToken()}; DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName()); } } diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h index 7a27452cc59..1d72a52cd34 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_settings.h @@ -5,7 +5,6 @@ #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> -#include <ydb/core/yq/libs/events/events.h> #include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> @@ -34,8 +33,8 @@ struct TYdbConfiguration : public TYdbSettings, public NCommon::TSettingDispatch void Init( const TYdbGatewayConfig& config, TIntrusivePtr<TTypeAnnotationContext> typeCtx, - const std::shared_ptr<NYq::IDatabaseAsyncResolver> dbResolver, - THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDatabaseAuth>& databaseIds + const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, + THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds ); bool HasCluster(TStringBuf cluster) const; |