summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Efimov <[email protected]>2025-02-04 11:06:23 +0100
committerGitHub <[email protected]>2025-02-04 13:06:23 +0300
commit647a02f90e702a97220c36442e07940471253c31 (patch)
treeb47e14b6ca1f3f5ea5867bc92e0adc461d45b1cc
parent0e5fbe35b4b629236760ee6a6b5ba2be9bffcbd0 (diff)
improve handling of query cancelling (#14169)
-rw-r--r--ydb/core/viewer/viewer_query.h28
1 files changed, 22 insertions, 6 deletions
diff --git a/ydb/core/viewer/viewer_query.h b/ydb/core/viewer/viewer_query.h
index cedf79690a6..9af544038b9 100644
--- a/ydb/core/viewer/viewer_query.h
+++ b/ydb/core/viewer/viewer_query.h
@@ -141,12 +141,30 @@ public:
Become(&TThis::StateWork, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup());
}
- void Cancelled() {
+ void CancelQuery() {
if (SessionId) {
auto event = std::make_unique<NKqp::TEvKqp::TEvCancelQueryRequest>();
event->Record.MutableRequest()->SetSessionId(SessionId);
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.release());
+ if (QueryResponse && !QueryResponse.IsDone()) {
+ QueryResponse.Error("QueryCancelled");
+ }
+ }
+ }
+
+ void CloseSession() {
+ if (SessionId) {
+ if (QueryResponse && !QueryResponse.IsDone()) {
+ CancelQuery();
+ }
+ auto event = std::make_unique<NKqp::TEvKqp::TEvCloseSessionRequest>();
+ event->Record.MutableRequest()->SetSessionId(SessionId);
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.release());
}
+ }
+
+ void Cancelled() {
+ CancelQuery();
PassAway();
}
@@ -160,11 +178,7 @@ public:
if (QueryId) {
Viewer->EndRunningQuery(QueryId, SelfId());
}
- if (SessionId) {
- auto event = std::make_unique<NKqp::TEvKqp::TEvCloseSessionRequest>();
- event->Record.MutableRequest()->SetSessionId(SessionId);
- Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.release());
- }
+ CloseSession();
TBase::PassAway();
}
@@ -353,6 +367,7 @@ public:
}
ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
QueryResponse = MakeRequest<NKqp::TEvKqp::TEvQueryResponse>(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
+
}
private:
@@ -565,6 +580,7 @@ private:
NYql::IssuesFromMessage(record.GetIssues(), issues);
MakeErrorReply(jsonResponse, NYdb::TStatus(NYdb::EStatus(record.GetStatusCode()), NYdb::NAdapters::ToSdkIssues(std::move(issues))));
}
+ CancelQuery();
ReplyWithJsonAndPassAway(jsonResponse);
}