diff options
| author | Alexey Efimov <[email protected]> | 2025-02-04 11:06:23 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-02-04 13:06:23 +0300 |
| commit | 647a02f90e702a97220c36442e07940471253c31 (patch) | |
| tree | b47e14b6ca1f3f5ea5867bc92e0adc461d45b1cc | |
| parent | 0e5fbe35b4b629236760ee6a6b5ba2be9bffcbd0 (diff) | |
improve handling of query cancelling (#14169)
| -rw-r--r-- | ydb/core/viewer/viewer_query.h | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/ydb/core/viewer/viewer_query.h b/ydb/core/viewer/viewer_query.h index cedf79690a6..9af544038b9 100644 --- a/ydb/core/viewer/viewer_query.h +++ b/ydb/core/viewer/viewer_query.h @@ -141,12 +141,30 @@ public: Become(&TThis::StateWork, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup()); } - void Cancelled() { + void CancelQuery() { if (SessionId) { auto event = std::make_unique<NKqp::TEvKqp::TEvCancelQueryRequest>(); event->Record.MutableRequest()->SetSessionId(SessionId); Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.release()); + if (QueryResponse && !QueryResponse.IsDone()) { + QueryResponse.Error("QueryCancelled"); + } + } + } + + void CloseSession() { + if (SessionId) { + if (QueryResponse && !QueryResponse.IsDone()) { + CancelQuery(); + } + auto event = std::make_unique<NKqp::TEvKqp::TEvCloseSessionRequest>(); + event->Record.MutableRequest()->SetSessionId(SessionId); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.release()); } + } + + void Cancelled() { + CancelQuery(); PassAway(); } @@ -160,11 +178,7 @@ public: if (QueryId) { Viewer->EndRunningQuery(QueryId, SelfId()); } - if (SessionId) { - auto event = std::make_unique<NKqp::TEvKqp::TEvCloseSessionRequest>(); - event->Record.MutableRequest()->SetSessionId(SessionId); - Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.release()); - } + CloseSession(); TBase::PassAway(); } @@ -353,6 +367,7 @@ public: } ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); QueryResponse = MakeRequest<NKqp::TEvKqp::TEvQueryResponse>(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); + } private: @@ -565,6 +580,7 @@ private: NYql::IssuesFromMessage(record.GetIssues(), issues); MakeErrorReply(jsonResponse, NYdb::TStatus(NYdb::EStatus(record.GetStatusCode()), NYdb::NAdapters::ToSdkIssues(std::move(issues)))); } + CancelQuery(); ReplyWithJsonAndPassAway(jsonResponse); } |
