diff options
author | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-02-10 17:56:14 +0300 |
---|---|---|
committer | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-02-10 17:56:14 +0300 |
commit | 4d80d317f1de5ac3f37b2ffa9a59f7ce27ee30fb (patch) | |
tree | fb98547446aa605b64924c1e00119088231b5b20 | |
parent | c6af0bf45be29629c5dd6ec5f948f678aadce664 (diff) | |
download | ydb-4d80d317f1de5ac3f37b2ffa9a59f7ce27ee30fb.tar.gz |
YQ-839 Fail query on unresolved database id
Fail query on unresolved ids
ref:ae687903e293f90b2406f7a2ec9dc0a75114fc38
13 files changed, 105 insertions, 68 deletions
diff --git a/ydb/core/yq/libs/actors/database_resolver.cpp b/ydb/core/yq/libs/actors/database_resolver.cpp index c784d80c00..ce7a2c66f4 100644 --- a/ydb/core/yq/libs/actors/database_resolver.cpp +++ b/ydb/core/yq/libs/actors/database_resolver.cpp @@ -20,7 +20,7 @@ namespace NYq { using namespace NActors; using namespace NYql; -using TEndpoint = TEvents::TEvEndpointResponse::TEndpoint; +using TEndpoint = TEvents::TDbResolverResponse::TEndpoint; using TParsers = THashMap<DatabaseType, std::function<TEndpoint(NJson::TJsonValue& body, bool)>>; using TCache = TTtlCache<std::tuple<TString, DatabaseType, TEvents::TDatabaseAuth>, std::variant<TEndpoint, TString>>; @@ -79,25 +79,29 @@ private: void DieOnTtl() { Success = false; - TString logMsg = "Could not resolve database ids: "; + TString errorMsg = "Could not resolve database ids: "; bool firstUnresolvedDbId = true; for (const auto& [_, info]: Requests) { const auto& dbId = std::get<0>(info); const auto& dbType = std::get<1>(info); if (const auto it = DatabaseId2Endpoint.find(std::make_pair(dbId, dbType)); it == DatabaseId2Endpoint.end()) { - logMsg += (firstUnresolvedDbId ? TString{""} : TString{", "}) + dbId; + errorMsg += (firstUnresolvedDbId ? TString{""} : TString{", "}) + dbId; if (firstUnresolvedDbId) firstUnresolvedDbId = false; } } - logMsg += TStringBuilder() << " in " << ResolvingTtl << " seconds."; - LOG_E(logMsg); + errorMsg += TStringBuilder() << " in " << ResolvingTtl << " seconds."; + LOG_E(errorMsg); - SendResolvedEndpointsAndDie(); + SendResolvedEndpointsAndDie(errorMsg); } - void SendResolvedEndpointsAndDie() { - Send(Sender, new TEvents::TEvEndpointResponse(std::move(DatabaseId2Endpoint), Success)); + void SendResolvedEndpointsAndDie(const TString& errorMsg) { + NYql::TIssues issues; + if (errorMsg) { + issues.AddIssue(errorMsg); + } + Send(Sender, new TEvents::TEvEndpointResponse(TEvents::TDbResolverResponse(std::move(DatabaseId2Endpoint), Success, issues))); PassAway(); } @@ -143,7 +147,9 @@ private: } } else { errorMessage = ev->Get()->Error; - const TString error = "Cannot resolve databaseId (status = " + ToString(status) + ")"; + const TString error = TStringBuilder() + << "Cannot resolve databaseId (status = " + ToString(status) + "). " + << "Response body from " << ev->Get()->Request->URL << ": " << (ev->Get()->Response ? ev->Get()->Response->Body : "empty"); if (!errorMessage.empty()) { errorMessage += '\n'; } @@ -165,7 +171,7 @@ private: } if (HandledIds == Requests.size()) { - SendResolvedEndpointsAndDie(); + SendResolvedEndpointsAndDie(errorMessage); } LOG_D(DatabaseId2Endpoint.size() << " of " << Requests.size() << " done"); @@ -177,7 +183,7 @@ private: const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, DatabaseType, TEvents::TDatabaseAuth>> Requests; const TString TraceId; const bool MdbTransformHost; - THashMap<std::pair<TString, DatabaseType>, TEvents::TEvEndpointResponse::TEndpoint> DatabaseId2Endpoint; + THashMap<std::pair<TString, DatabaseType>, TEndpoint> DatabaseId2Endpoint; size_t HandledIds = 0; bool Success = true; const TParsers& Parsers; @@ -216,7 +222,7 @@ public: } Y_ENSURE(endpoint); - return TEvents::TEvEndpointResponse::TEndpoint{endpoint, database, secure}; + return TEndpoint{endpoint, database, secure}; }; Parsers[DatabaseType::Ydb] = ydbParser; Parsers[DatabaseType::DataStreams] = [ydbParser](NJson::TJsonValue& databaseInfo, bool mdbTransformHost) @@ -244,7 +250,7 @@ public: ythrow yexception() << "No ALIVE ClickHouse hosts exist"; } endpoint = mdbTransformHost ? TransformMdbHostToCorrectFormat(endpoint) : endpoint; - return TEvents::TEvEndpointResponse::TEndpoint{endpoint, "", true}; + return TEndpoint{endpoint, "", true}; }; } @@ -305,7 +311,7 @@ private: ctx.Send(new IEventHandle(HttpProxy, helper, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(request))); } } else { - Send(ev->Sender, new TEvents::TEvEndpointResponse(std::move(ready), /*success=*/true)); + Send(ev->Sender, new TEvents::TEvEndpointResponse(TEvents::TDbResolverResponse{std::move(ready), /*success=*/true})); } } diff --git a/ydb/core/yq/libs/db_resolver/db_async_resolver.h b/ydb/core/yq/libs/db_resolver/db_async_resolver.h index ae81326839..a1e4ab18a1 100644 --- a/ydb/core/yq/libs/db_resolver/db_async_resolver.h +++ b/ydb/core/yq/libs/db_resolver/db_async_resolver.h @@ -5,16 +5,14 @@ namespace NYq { -struct TResolveParams { //TODO: remove +struct TResolveParams { THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth> Ids; TString TraceId; }; class IDatabaseAsyncResolver { public: - using TEndpoint = TEvents::TEvEndpointResponse::TEndpoint; - - virtual NThreading::TFuture<THashMap<std::pair<TString, DatabaseType>, TEndpoint>> ResolveIds(const TResolveParams& params) const = 0; + virtual NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const = 0; virtual ~IDatabaseAsyncResolver() = default; }; diff --git a/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.cpp b/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.cpp index 253c205cee..fb0e2d9789 100644 --- a/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.cpp +++ b/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.cpp @@ -2,8 +2,6 @@ namespace NYq { -using TEndpoint = TEvents::TEvEndpointResponse::TEndpoint; - TDatabaseAsyncResolver::TDatabaseAsyncResolver( NActors::TActorSystem* actorSystem, const NActors::TActorId& recipient, @@ -17,11 +15,11 @@ TDatabaseAsyncResolver::TDatabaseAsyncResolver( , MdbTransformHost(mdbTransformHost) {} -NThreading::TFuture<THashMap<std::pair<TString, DatabaseType>, TEndpoint>> TDatabaseAsyncResolver::ResolveIds(const TResolveParams& params) const { - auto promise = NThreading::NewPromise<THashMap<std::pair<TString, DatabaseType>, TEndpoint>>(); +NThreading::TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolver::ResolveIds(const TResolveParams& params) const { + auto promise = NThreading::NewPromise<TEvents::TDbResolverResponse>(); auto callback = MakeHolder<NYql::TRichActorFutureCallback<TEvents::TEvEndpointResponse>>( [promise] (TAutoPtr<NActors::TEventHandle<TEvents::TEvEndpointResponse>>& event) mutable { - promise.SetValue(event->Get()->DatabaseId2Endpoint); + promise.SetValue(std::move(event->Get()->DbResolverResponse)); }, [promise] () mutable { //TODO add logs diff --git a/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.h b/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.h index cb13cde383..27b8f3a6f3 100644 --- a/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.h +++ b/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.h @@ -4,8 +4,6 @@ namespace NYq { -using TEndpoint = TEvents::TEvEndpointResponse::TEndpoint; - class TDatabaseAsyncResolver : public IDatabaseAsyncResolver { public: TDatabaseAsyncResolver( @@ -16,7 +14,7 @@ public: const bool mdbTransformHost ); - NThreading::TFuture<THashMap<std::pair<TString, DatabaseType>, TEndpoint>> ResolveIds(const TResolveParams& params) const override; + NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const override; private: NActors::TActorSystem* ActorSystem; const NActors::TActorId Recipient; diff --git a/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.cpp b/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.cpp index 4d38454f62..027415d9d7 100644 --- a/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.cpp +++ b/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.cpp @@ -20,7 +20,7 @@ namespace NYq { , Connections(connections) {} - NThreading::TFuture<THashMap<std::pair<TString, DatabaseType>, TEndpoint>> TDatabaseAsyncResolverWithMeta::ResolveIds(const TResolveParams& params) const { + NThreading::TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolverWithMeta::ResolveIds(const TResolveParams& params) const { return DbResolver.ResolveIds(params); } diff --git a/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.h b/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.h index 4e0f5ae468..48fa9df041 100644 --- a/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.h +++ b/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.h @@ -3,8 +3,6 @@ namespace NYq { -using TEndpoint = TEvents::TEvEndpointResponse::TEndpoint; - class TDatabaseAsyncResolverWithMeta : public IDatabaseAsyncResolver { public: TDatabaseAsyncResolverWithMeta( @@ -19,7 +17,7 @@ public: const THashMap<TString, YandexQuery::Connection>& connections ); - NThreading::TFuture<THashMap<std::pair<TString, DatabaseType>, TEndpoint>> ResolveIds(const TResolveParams& params) const override; + NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const override; TString GetTraceId() const; diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h index b3c70e5c5f..49405cfba4 100644 --- a/ydb/core/yq/libs/events/events.h +++ b/ydb/core/yq/libs/events/events.h @@ -126,19 +126,31 @@ struct TEvents { {} }; - struct TEvEndpointResponse : NActors::TEventLocal<TEvEndpointResponse, TEventIds::EvEndpointResponse> { + struct TDbResolverResponse { struct TEndpoint { TString Endpoint; TString Database; bool Secure = false; }; - THashMap<std::pair<TString, DatabaseType>, TEndpoint> DatabaseId2Endpoint; - bool Success; - TEvEndpointResponse(THashMap<std::pair<TString, DatabaseType>, TEndpoint>&& res, bool success) - : DatabaseId2Endpoint(std::move(res)) + TDbResolverResponse() = default; + + TDbResolverResponse( + THashMap<std::pair<TString, DatabaseType>, TEndpoint>&& databaseId2Endpoint, + bool success = false, + const NYql::TIssues& issues = {}) + : DatabaseId2Endpoint(databaseId2Endpoint) , Success(success) - { } + , Issues(issues) {} + + THashMap<std::pair<TString, DatabaseType>, TEndpoint> DatabaseId2Endpoint; + bool Success = false; + NYql::TIssues Issues; + }; + + struct TEvEndpointResponse : NActors::TEventLocal<TEvEndpointResponse, TEventIds::EvEndpointResponse> { + TDbResolverResponse DbResolverResponse; + explicit TEvEndpointResponse(TDbResolverResponse&& response) noexcept : DbResolverResponse(std::move(response)) {} }; struct TDatabaseAuth { diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp index 193c1e7bae..8f1ac3839e 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp @@ -16,7 +16,7 @@ using namespace NNodes; class TClickHouseIODiscoveryTransformer : public TGraphTransformerBase { -using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TEvEndpointResponse::TEndpoint>; +using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>; public: TClickHouseIODiscoveryTransformer(TClickHouseState::TPtr state) @@ -60,11 +60,15 @@ public: } } YQL_CLOG(DEBUG, ProviderClickHouse) << "Ids to resolve: " << ids.size(); + if (ids.empty()) { return TStatus::Ok; } - AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([resolvedIds_ = ResolvedIds_](const auto& future) { - *resolvedIds_ = future.GetValue(); + + const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_; + AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future) { + if (const auto res = response.lock()) + *res = std::move(future.ExtractValue()); }); return TStatus::Async; } @@ -73,11 +77,15 @@ public: return AsyncFuture_; } - TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext&) final { + TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { output = input; AsyncFuture_.GetValue(); - FullResolvedIds_.insert(ResolvedIds_->begin(), ResolvedIds_->end()); - ResolvedIds_->clear(); + if (!DbResolverResponse_->Success) { + ctx.IssueManager.AddIssues(DbResolverResponse_->Issues); + return TStatus::Error; + } + FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end()); + DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); YQL_CLOG(DEBUG, ProviderClickHouse) << "ResolvedIds: " << FullResolvedIds_.size(); auto& endpoints = State_->Configuration->Endpoints; const auto& id2Clusters = State_->Configuration->DbId2Clusters; @@ -107,7 +115,7 @@ private: NThreading::TFuture<void> AsyncFuture_; TDbId2Endpoint FullResolvedIds_; - std::shared_ptr<TDbId2Endpoint> ResolvedIds_ = std::make_shared<TDbId2Endpoint>(); + std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); }; } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp index 847afd3b3d..976469de6a 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp @@ -12,9 +12,7 @@ namespace { using namespace NNodes; class TPqDataSinkIODiscoveryTransformer : public TGraphTransformerBase { - -using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TEvEndpointResponse::TEndpoint>; - +using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>; public: explicit TPqDataSinkIODiscoveryTransformer(TPqState::TPtr state) : State_(state) @@ -36,8 +34,11 @@ public: if (ids.empty()) return TStatus::Ok; - AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([resolvedIds_ = ResolvedIds_](const auto& future) { - *resolvedIds_ = future.GetValue(); + const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_; + AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future) + { + if (const auto res = response.lock()) + *res = std::move(future.ExtractValue()); }); return TStatus::Async; } @@ -46,11 +47,15 @@ public: return AsyncFuture_; } - TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext&) final { + TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { output = input; AsyncFuture_.GetValue(); - FullResolvedIds_.insert(ResolvedIds_->begin(), ResolvedIds_->end()); - ResolvedIds_->clear(); + if (!DbResolverResponse_->Success) { + ctx.IssueManager.AddIssues(DbResolverResponse_->Issues); + return TStatus::Error; + } + FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end()); + DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); FillSettingsWithResolvedYdsIds(State_, FullResolvedIds_); return TStatus::Ok; } @@ -59,7 +64,7 @@ private: const TPqState::TPtr State_; NThreading::TFuture<void> AsyncFuture_; TDbId2Endpoint FullResolvedIds_; - std::shared_ptr<TDbId2Endpoint> ResolvedIds_ = std::make_shared<TDbId2Endpoint>(); + std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); }; } diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp index 47a2134147..6b3590ef04 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp @@ -79,7 +79,7 @@ void FindYdsDbIdsForResolving( void FillSettingsWithResolvedYdsIds( const TPqState::TPtr& state, - const THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TEvEndpointResponse::TEndpoint>& fullResolvedIds) + const THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>& fullResolvedIds) { YQL_CLOG(INFO, ProviderPq) << "FullResolvedIds size: " << fullResolvedIds.size(); auto& clusters = state->Configuration->ClustersConfigurationSettings; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h index 628ab2f65d..f1f808fe31 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h @@ -19,6 +19,6 @@ void FindYdsDbIdsForResolving( void FillSettingsWithResolvedYdsIds( const TPqState::TPtr& state, - const THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TEvEndpointResponse::TEndpoint>& fullResolvedIds); + const THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>& fullResolvedIds); } // namespace NYql diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp index f2e347cf73..bb9650d5cc 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp @@ -13,7 +13,7 @@ using namespace NNodes; class TPqIODiscoveryTransformer : public TGraphTransformerBase { -using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TEvEndpointResponse::TEndpoint>; +using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>; public: explicit TPqIODiscoveryTransformer(TPqState::TPtr state) @@ -36,8 +36,11 @@ public: if (ids.empty()) return TStatus::Ok; - AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([resolvedIds_ = ResolvedIds_](const auto& future) { - *resolvedIds_ = future.GetValue(); + const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_; + AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future) + { + if (const auto res = response.lock()) + *res = std::move(future.ExtractValue()); }); return TStatus::Async; } @@ -46,11 +49,15 @@ public: return AsyncFuture_; } - TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext&) final { + TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { output = input; AsyncFuture_.GetValue(); - FullResolvedIds_.insert(ResolvedIds_->begin(), ResolvedIds_->end()); - ResolvedIds_->clear(); + if (!DbResolverResponse_->Success) { + ctx.IssueManager.AddIssues(DbResolverResponse_->Issues); + return TStatus::Error; + } + FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end()); + DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); FillSettingsWithResolvedYdsIds(State_, FullResolvedIds_); return TStatus::Ok; } @@ -59,7 +66,7 @@ private: const TPqState::TPtr State_; NThreading::TFuture<void> AsyncFuture_; TDbId2Endpoint FullResolvedIds_; - std::shared_ptr<TDbId2Endpoint> ResolvedIds_ = std::make_shared<TDbId2Endpoint>(); + std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); }; } diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp index 794c725164..b837a61aa6 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp @@ -19,7 +19,7 @@ namespace { using namespace NNodes; class TYdbIODiscoveryTransformer : public TGraphTransformerBase { - using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TEvEndpointResponse::TEndpoint>; +using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>; public: TYdbIODiscoveryTransformer(TYdbState::TPtr state) : State_(std::move(state)) @@ -59,8 +59,11 @@ public: if (ids.empty()) { return TStatus::Ok; } - AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([resolvedIds_ = ResolvedIds_](const auto& future) { - *resolvedIds_ = future.GetValue(); + const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_; + AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future) + { + if (const auto res = response.lock()) + *res = std::move(future.ExtractValue()); }); return TStatus::Async; } @@ -69,11 +72,15 @@ public: return AsyncFuture_; } - TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext&) final { + TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { output = input; AsyncFuture_.GetValue(); - FullResolvedIds_.insert(ResolvedIds_->begin(), ResolvedIds_->end()); - ResolvedIds_->clear(); + if (!DbResolverResponse_->Success) { + ctx.IssueManager.AddIssues(DbResolverResponse_->Issues); + return TStatus::Error; + } + FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end()); + DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); auto& clusters = State_->Configuration->Clusters; const auto& id2Clusters = State_->Configuration->DbId2Clusters; for (const auto& [dbIdWithType, info] : FullResolvedIds_) { @@ -96,7 +103,7 @@ private: NThreading::TFuture<void> AsyncFuture_; TDbId2Endpoint FullResolvedIds_; - std::shared_ptr<TDbId2Endpoint> ResolvedIds_ = std::make_shared<TDbId2Endpoint>(); + std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>(); }; } |