aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordinmukhammed <dinmukhammed@yandex-team.ru>2022-02-10 17:56:14 +0300
committerdinmukhammed <dinmukhammed@yandex-team.ru>2022-02-10 17:56:14 +0300
commit4d80d317f1de5ac3f37b2ffa9a59f7ce27ee30fb (patch)
treefb98547446aa605b64924c1e00119088231b5b20
parentc6af0bf45be29629c5dd6ec5f948f678aadce664 (diff)
downloadydb-4d80d317f1de5ac3f37b2ffa9a59f7ce27ee30fb.tar.gz
YQ-839 Fail query on unresolved database id
Fail query on unresolved ids ref:ae687903e293f90b2406f7a2ec9dc0a75114fc38
-rw-r--r--ydb/core/yq/libs/actors/database_resolver.cpp34
-rw-r--r--ydb/core/yq/libs/db_resolver/db_async_resolver.h6
-rw-r--r--ydb/core/yq/libs/db_resolver/db_async_resolver_impl.cpp8
-rw-r--r--ydb/core/yq/libs/db_resolver/db_async_resolver_impl.h4
-rw-r--r--ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.cpp2
-rw-r--r--ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.h4
-rw-r--r--ydb/core/yq/libs/events/events.h24
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp22
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp23
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp2
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_helpers.h2
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp21
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp21
13 files changed, 105 insertions, 68 deletions
diff --git a/ydb/core/yq/libs/actors/database_resolver.cpp b/ydb/core/yq/libs/actors/database_resolver.cpp
index c784d80c00..ce7a2c66f4 100644
--- a/ydb/core/yq/libs/actors/database_resolver.cpp
+++ b/ydb/core/yq/libs/actors/database_resolver.cpp
@@ -20,7 +20,7 @@ namespace NYq {
using namespace NActors;
using namespace NYql;
-using TEndpoint = TEvents::TEvEndpointResponse::TEndpoint;
+using TEndpoint = TEvents::TDbResolverResponse::TEndpoint;
using TParsers = THashMap<DatabaseType, std::function<TEndpoint(NJson::TJsonValue& body, bool)>>;
using TCache = TTtlCache<std::tuple<TString, DatabaseType, TEvents::TDatabaseAuth>, std::variant<TEndpoint, TString>>;
@@ -79,25 +79,29 @@ private:
void DieOnTtl() {
Success = false;
- TString logMsg = "Could not resolve database ids: ";
+ TString errorMsg = "Could not resolve database ids: ";
bool firstUnresolvedDbId = true;
for (const auto& [_, info]: Requests) {
const auto& dbId = std::get<0>(info);
const auto& dbType = std::get<1>(info);
if (const auto it = DatabaseId2Endpoint.find(std::make_pair(dbId, dbType)); it == DatabaseId2Endpoint.end()) {
- logMsg += (firstUnresolvedDbId ? TString{""} : TString{", "}) + dbId;
+ errorMsg += (firstUnresolvedDbId ? TString{""} : TString{", "}) + dbId;
if (firstUnresolvedDbId)
firstUnresolvedDbId = false;
}
}
- logMsg += TStringBuilder() << " in " << ResolvingTtl << " seconds.";
- LOG_E(logMsg);
+ errorMsg += TStringBuilder() << " in " << ResolvingTtl << " seconds.";
+ LOG_E(errorMsg);
- SendResolvedEndpointsAndDie();
+ SendResolvedEndpointsAndDie(errorMsg);
}
- void SendResolvedEndpointsAndDie() {
- Send(Sender, new TEvents::TEvEndpointResponse(std::move(DatabaseId2Endpoint), Success));
+ void SendResolvedEndpointsAndDie(const TString& errorMsg) {
+ NYql::TIssues issues;
+ if (errorMsg) {
+ issues.AddIssue(errorMsg);
+ }
+ Send(Sender, new TEvents::TEvEndpointResponse(TEvents::TDbResolverResponse(std::move(DatabaseId2Endpoint), Success, issues)));
PassAway();
}
@@ -143,7 +147,9 @@ private:
}
} else {
errorMessage = ev->Get()->Error;
- const TString error = "Cannot resolve databaseId (status = " + ToString(status) + ")";
+ const TString error = TStringBuilder()
+ << "Cannot resolve databaseId (status = " + ToString(status) + "). "
+ << "Response body from " << ev->Get()->Request->URL << ": " << (ev->Get()->Response ? ev->Get()->Response->Body : "empty");
if (!errorMessage.empty()) {
errorMessage += '\n';
}
@@ -165,7 +171,7 @@ private:
}
if (HandledIds == Requests.size()) {
- SendResolvedEndpointsAndDie();
+ SendResolvedEndpointsAndDie(errorMessage);
}
LOG_D(DatabaseId2Endpoint.size() << " of " << Requests.size() << " done");
@@ -177,7 +183,7 @@ private:
const THashMap<NHttp::THttpOutgoingRequestPtr, std::tuple<TString, DatabaseType, TEvents::TDatabaseAuth>> Requests;
const TString TraceId;
const bool MdbTransformHost;
- THashMap<std::pair<TString, DatabaseType>, TEvents::TEvEndpointResponse::TEndpoint> DatabaseId2Endpoint;
+ THashMap<std::pair<TString, DatabaseType>, TEndpoint> DatabaseId2Endpoint;
size_t HandledIds = 0;
bool Success = true;
const TParsers& Parsers;
@@ -216,7 +222,7 @@ public:
}
Y_ENSURE(endpoint);
- return TEvents::TEvEndpointResponse::TEndpoint{endpoint, database, secure};
+ return TEndpoint{endpoint, database, secure};
};
Parsers[DatabaseType::Ydb] = ydbParser;
Parsers[DatabaseType::DataStreams] = [ydbParser](NJson::TJsonValue& databaseInfo, bool mdbTransformHost)
@@ -244,7 +250,7 @@ public:
ythrow yexception() << "No ALIVE ClickHouse hosts exist";
}
endpoint = mdbTransformHost ? TransformMdbHostToCorrectFormat(endpoint) : endpoint;
- return TEvents::TEvEndpointResponse::TEndpoint{endpoint, "", true};
+ return TEndpoint{endpoint, "", true};
};
}
@@ -305,7 +311,7 @@ private:
ctx.Send(new IEventHandle(HttpProxy, helper, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(request)));
}
} else {
- Send(ev->Sender, new TEvents::TEvEndpointResponse(std::move(ready), /*success=*/true));
+ Send(ev->Sender, new TEvents::TEvEndpointResponse(TEvents::TDbResolverResponse{std::move(ready), /*success=*/true}));
}
}
diff --git a/ydb/core/yq/libs/db_resolver/db_async_resolver.h b/ydb/core/yq/libs/db_resolver/db_async_resolver.h
index ae81326839..a1e4ab18a1 100644
--- a/ydb/core/yq/libs/db_resolver/db_async_resolver.h
+++ b/ydb/core/yq/libs/db_resolver/db_async_resolver.h
@@ -5,16 +5,14 @@
namespace NYq {
-struct TResolveParams { //TODO: remove
+struct TResolveParams {
THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth> Ids;
TString TraceId;
};
class IDatabaseAsyncResolver {
public:
- using TEndpoint = TEvents::TEvEndpointResponse::TEndpoint;
-
- virtual NThreading::TFuture<THashMap<std::pair<TString, DatabaseType>, TEndpoint>> ResolveIds(const TResolveParams& params) const = 0;
+ virtual NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const = 0;
virtual ~IDatabaseAsyncResolver() = default;
};
diff --git a/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.cpp b/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.cpp
index 253c205cee..fb0e2d9789 100644
--- a/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.cpp
+++ b/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.cpp
@@ -2,8 +2,6 @@
namespace NYq {
-using TEndpoint = TEvents::TEvEndpointResponse::TEndpoint;
-
TDatabaseAsyncResolver::TDatabaseAsyncResolver(
NActors::TActorSystem* actorSystem,
const NActors::TActorId& recipient,
@@ -17,11 +15,11 @@ TDatabaseAsyncResolver::TDatabaseAsyncResolver(
, MdbTransformHost(mdbTransformHost)
{}
-NThreading::TFuture<THashMap<std::pair<TString, DatabaseType>, TEndpoint>> TDatabaseAsyncResolver::ResolveIds(const TResolveParams& params) const {
- auto promise = NThreading::NewPromise<THashMap<std::pair<TString, DatabaseType>, TEndpoint>>();
+NThreading::TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolver::ResolveIds(const TResolveParams& params) const {
+ auto promise = NThreading::NewPromise<TEvents::TDbResolverResponse>();
auto callback = MakeHolder<NYql::TRichActorFutureCallback<TEvents::TEvEndpointResponse>>(
[promise] (TAutoPtr<NActors::TEventHandle<TEvents::TEvEndpointResponse>>& event) mutable {
- promise.SetValue(event->Get()->DatabaseId2Endpoint);
+ promise.SetValue(std::move(event->Get()->DbResolverResponse));
},
[promise] () mutable {
//TODO add logs
diff --git a/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.h b/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.h
index cb13cde383..27b8f3a6f3 100644
--- a/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.h
+++ b/ydb/core/yq/libs/db_resolver/db_async_resolver_impl.h
@@ -4,8 +4,6 @@
namespace NYq {
-using TEndpoint = TEvents::TEvEndpointResponse::TEndpoint;
-
class TDatabaseAsyncResolver : public IDatabaseAsyncResolver {
public:
TDatabaseAsyncResolver(
@@ -16,7 +14,7 @@ public:
const bool mdbTransformHost
);
- NThreading::TFuture<THashMap<std::pair<TString, DatabaseType>, TEndpoint>> ResolveIds(const TResolveParams& params) const override;
+ NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const override;
private:
NActors::TActorSystem* ActorSystem;
const NActors::TActorId Recipient;
diff --git a/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.cpp b/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.cpp
index 4d38454f62..027415d9d7 100644
--- a/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.cpp
+++ b/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.cpp
@@ -20,7 +20,7 @@ namespace NYq {
, Connections(connections)
{}
- NThreading::TFuture<THashMap<std::pair<TString, DatabaseType>, TEndpoint>> TDatabaseAsyncResolverWithMeta::ResolveIds(const TResolveParams& params) const {
+ NThreading::TFuture<TEvents::TDbResolverResponse> TDatabaseAsyncResolverWithMeta::ResolveIds(const TResolveParams& params) const {
return DbResolver.ResolveIds(params);
}
diff --git a/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.h b/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.h
index 4e0f5ae468..48fa9df041 100644
--- a/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.h
+++ b/ydb/core/yq/libs/db_resolver/db_async_resolver_with_meta.h
@@ -3,8 +3,6 @@
namespace NYq {
-using TEndpoint = TEvents::TEvEndpointResponse::TEndpoint;
-
class TDatabaseAsyncResolverWithMeta : public IDatabaseAsyncResolver {
public:
TDatabaseAsyncResolverWithMeta(
@@ -19,7 +17,7 @@ public:
const THashMap<TString, YandexQuery::Connection>& connections
);
- NThreading::TFuture<THashMap<std::pair<TString, DatabaseType>, TEndpoint>> ResolveIds(const TResolveParams& params) const override;
+ NThreading::TFuture<TEvents::TDbResolverResponse> ResolveIds(const TResolveParams& params) const override;
TString GetTraceId() const;
diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h
index b3c70e5c5f..49405cfba4 100644
--- a/ydb/core/yq/libs/events/events.h
+++ b/ydb/core/yq/libs/events/events.h
@@ -126,19 +126,31 @@ struct TEvents {
{}
};
- struct TEvEndpointResponse : NActors::TEventLocal<TEvEndpointResponse, TEventIds::EvEndpointResponse> {
+ struct TDbResolverResponse {
struct TEndpoint {
TString Endpoint;
TString Database;
bool Secure = false;
};
- THashMap<std::pair<TString, DatabaseType>, TEndpoint> DatabaseId2Endpoint;
- bool Success;
- TEvEndpointResponse(THashMap<std::pair<TString, DatabaseType>, TEndpoint>&& res, bool success)
- : DatabaseId2Endpoint(std::move(res))
+ TDbResolverResponse() = default;
+
+ TDbResolverResponse(
+ THashMap<std::pair<TString, DatabaseType>, TEndpoint>&& databaseId2Endpoint,
+ bool success = false,
+ const NYql::TIssues& issues = {})
+ : DatabaseId2Endpoint(databaseId2Endpoint)
, Success(success)
- { }
+ , Issues(issues) {}
+
+ THashMap<std::pair<TString, DatabaseType>, TEndpoint> DatabaseId2Endpoint;
+ bool Success = false;
+ NYql::TIssues Issues;
+ };
+
+ struct TEvEndpointResponse : NActors::TEventLocal<TEvEndpointResponse, TEventIds::EvEndpointResponse> {
+ TDbResolverResponse DbResolverResponse;
+ explicit TEvEndpointResponse(TDbResolverResponse&& response) noexcept : DbResolverResponse(std::move(response)) {}
};
struct TDatabaseAuth {
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp
index 193c1e7bae..8f1ac3839e 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_io_discovery.cpp
@@ -16,7 +16,7 @@ using namespace NNodes;
class TClickHouseIODiscoveryTransformer : public TGraphTransformerBase {
-using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TEvEndpointResponse::TEndpoint>;
+using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>;
public:
TClickHouseIODiscoveryTransformer(TClickHouseState::TPtr state)
@@ -60,11 +60,15 @@ public:
}
}
YQL_CLOG(DEBUG, ProviderClickHouse) << "Ids to resolve: " << ids.size();
+
if (ids.empty()) {
return TStatus::Ok;
}
- AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([resolvedIds_ = ResolvedIds_](const auto& future) {
- *resolvedIds_ = future.GetValue();
+
+ const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_;
+ AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future) {
+ if (const auto res = response.lock())
+ *res = std::move(future.ExtractValue());
});
return TStatus::Async;
}
@@ -73,11 +77,15 @@ public:
return AsyncFuture_;
}
- TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext&) final {
+ TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
output = input;
AsyncFuture_.GetValue();
- FullResolvedIds_.insert(ResolvedIds_->begin(), ResolvedIds_->end());
- ResolvedIds_->clear();
+ if (!DbResolverResponse_->Success) {
+ ctx.IssueManager.AddIssues(DbResolverResponse_->Issues);
+ return TStatus::Error;
+ }
+ FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end());
+ DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
YQL_CLOG(DEBUG, ProviderClickHouse) << "ResolvedIds: " << FullResolvedIds_.size();
auto& endpoints = State_->Configuration->Endpoints;
const auto& id2Clusters = State_->Configuration->DbId2Clusters;
@@ -107,7 +115,7 @@ private:
NThreading::TFuture<void> AsyncFuture_;
TDbId2Endpoint FullResolvedIds_;
- std::shared_ptr<TDbId2Endpoint> ResolvedIds_ = std::make_shared<TDbId2Endpoint>();
+ std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
};
}
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp
index 847afd3b3d..976469de6a 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasink_io_discovery.cpp
@@ -12,9 +12,7 @@ namespace {
using namespace NNodes;
class TPqDataSinkIODiscoveryTransformer : public TGraphTransformerBase {
-
-using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TEvEndpointResponse::TEndpoint>;
-
+using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>;
public:
explicit TPqDataSinkIODiscoveryTransformer(TPqState::TPtr state)
: State_(state)
@@ -36,8 +34,11 @@ public:
if (ids.empty())
return TStatus::Ok;
- AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([resolvedIds_ = ResolvedIds_](const auto& future) {
- *resolvedIds_ = future.GetValue();
+ const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_;
+ AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future)
+ {
+ if (const auto res = response.lock())
+ *res = std::move(future.ExtractValue());
});
return TStatus::Async;
}
@@ -46,11 +47,15 @@ public:
return AsyncFuture_;
}
- TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext&) final {
+ TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
output = input;
AsyncFuture_.GetValue();
- FullResolvedIds_.insert(ResolvedIds_->begin(), ResolvedIds_->end());
- ResolvedIds_->clear();
+ if (!DbResolverResponse_->Success) {
+ ctx.IssueManager.AddIssues(DbResolverResponse_->Issues);
+ return TStatus::Error;
+ }
+ FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end());
+ DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
FillSettingsWithResolvedYdsIds(State_, FullResolvedIds_);
return TStatus::Ok;
}
@@ -59,7 +64,7 @@ private:
const TPqState::TPtr State_;
NThreading::TFuture<void> AsyncFuture_;
TDbId2Endpoint FullResolvedIds_;
- std::shared_ptr<TDbId2Endpoint> ResolvedIds_ = std::make_shared<TDbId2Endpoint>();
+ std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
};
}
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp
index 47a2134147..6b3590ef04 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp
@@ -79,7 +79,7 @@ void FindYdsDbIdsForResolving(
void FillSettingsWithResolvedYdsIds(
const TPqState::TPtr& state,
- const THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TEvEndpointResponse::TEndpoint>& fullResolvedIds)
+ const THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>& fullResolvedIds)
{
YQL_CLOG(INFO, ProviderPq) << "FullResolvedIds size: " << fullResolvedIds.size();
auto& clusters = state->Configuration->ClustersConfigurationSettings;
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h
index 628ab2f65d..f1f808fe31 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h
@@ -19,6 +19,6 @@ void FindYdsDbIdsForResolving(
void FillSettingsWithResolvedYdsIds(
const TPqState::TPtr& state,
- const THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TEvEndpointResponse::TEndpoint>& fullResolvedIds);
+ const THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>& fullResolvedIds);
} // namespace NYql
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp
index f2e347cf73..bb9650d5cc 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_io_discovery.cpp
@@ -13,7 +13,7 @@ using namespace NNodes;
class TPqIODiscoveryTransformer : public TGraphTransformerBase {
-using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TEvEndpointResponse::TEndpoint>;
+using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>;
public:
explicit TPqIODiscoveryTransformer(TPqState::TPtr state)
@@ -36,8 +36,11 @@ public:
if (ids.empty())
return TStatus::Ok;
- AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([resolvedIds_ = ResolvedIds_](const auto& future) {
- *resolvedIds_ = future.GetValue();
+ const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_;
+ AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future)
+ {
+ if (const auto res = response.lock())
+ *res = std::move(future.ExtractValue());
});
return TStatus::Async;
}
@@ -46,11 +49,15 @@ public:
return AsyncFuture_;
}
- TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext&) final {
+ TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
output = input;
AsyncFuture_.GetValue();
- FullResolvedIds_.insert(ResolvedIds_->begin(), ResolvedIds_->end());
- ResolvedIds_->clear();
+ if (!DbResolverResponse_->Success) {
+ ctx.IssueManager.AddIssues(DbResolverResponse_->Issues);
+ return TStatus::Error;
+ }
+ FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end());
+ DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
FillSettingsWithResolvedYdsIds(State_, FullResolvedIds_);
return TStatus::Ok;
}
@@ -59,7 +66,7 @@ private:
const TPqState::TPtr State_;
NThreading::TFuture<void> AsyncFuture_;
TDbId2Endpoint FullResolvedIds_;
- std::shared_ptr<TDbId2Endpoint> ResolvedIds_ = std::make_shared<TDbId2Endpoint>();
+ std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
};
}
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp
index 794c725164..b837a61aa6 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_io_discovery.cpp
@@ -19,7 +19,7 @@ namespace {
using namespace NNodes;
class TYdbIODiscoveryTransformer : public TGraphTransformerBase {
- using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TEvEndpointResponse::TEndpoint>;
+using TDbId2Endpoint = THashMap<std::pair<TString, NYq::DatabaseType>, NYq::TEvents::TDbResolverResponse::TEndpoint>;
public:
TYdbIODiscoveryTransformer(TYdbState::TPtr state)
: State_(std::move(state))
@@ -59,8 +59,11 @@ public:
if (ids.empty()) {
return TStatus::Ok;
}
- AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([resolvedIds_ = ResolvedIds_](const auto& future) {
- *resolvedIds_ = future.GetValue();
+ const std::weak_ptr<NYq::TEvents::TDbResolverResponse> response = DbResolverResponse_;
+ AsyncFuture_ = State_->DbResolver->ResolveIds({ids, State_->DbResolver->GetTraceId()}).Apply([response](auto future)
+ {
+ if (const auto res = response.lock())
+ *res = std::move(future.ExtractValue());
});
return TStatus::Async;
}
@@ -69,11 +72,15 @@ public:
return AsyncFuture_;
}
- TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext&) final {
+ TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
output = input;
AsyncFuture_.GetValue();
- FullResolvedIds_.insert(ResolvedIds_->begin(), ResolvedIds_->end());
- ResolvedIds_->clear();
+ if (!DbResolverResponse_->Success) {
+ ctx.IssueManager.AddIssues(DbResolverResponse_->Issues);
+ return TStatus::Error;
+ }
+ FullResolvedIds_.insert(DbResolverResponse_->DatabaseId2Endpoint.begin(), DbResolverResponse_->DatabaseId2Endpoint.end());
+ DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
auto& clusters = State_->Configuration->Clusters;
const auto& id2Clusters = State_->Configuration->DbId2Clusters;
for (const auto& [dbIdWithType, info] : FullResolvedIds_) {
@@ -96,7 +103,7 @@ private:
NThreading::TFuture<void> AsyncFuture_;
TDbId2Endpoint FullResolvedIds_;
- std::shared_ptr<TDbId2Endpoint> ResolvedIds_ = std::make_shared<TDbId2Endpoint>();
+ std::shared_ptr<NYq::TEvents::TDbResolverResponse> DbResolverResponse_ = std::make_shared<NYq::TEvents::TDbResolverResponse>();
};
}