aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-06-05 15:11:52 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-06-05 15:11:52 +0300
commit45c174f12cdc1defbcec3574d224d9d5be4e5b95 (patch)
tree80967404904595cd733b7e0b58dd928f8034dbf1
parent4e8aaf987f81f9c46ccee9a9f2943432d816365d (diff)
downloadydb-45c174f12cdc1defbcec3574d224d9d5be4e5b95.tar.gz
Support CancelOperation for ScriptExecution operations
-rw-r--r--ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/rpc_cancel_operation.cpp32
-rw-r--r--ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/core/kqp/common/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/core/kqp/common/events/events.h30
-rw-r--r--ydb/core/kqp/common/events/script_executions.h35
-rw-r--r--ydb/core/kqp/common/kqp_script_executions.cpp27
-rw-r--r--ydb/core/kqp/common/kqp_script_executions.h14
-rw-r--r--ydb/core/kqp/common/simple/kqp_event_ids.h7
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp14
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h3
-rw-r--r--ydb/core/kqp/counters/kqp_db_counters.h1
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp38
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp375
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.h6
-rw-r--r--ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp175
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp19
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp34
-rw-r--r--ydb/core/protos/kqp.proto23
32 files changed, 749 insertions, 108 deletions
diff --git a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
index 38a8b69bd3..9ba2c54c10 100644
--- a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
@@ -40,6 +40,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-core-health_check
ydb-core-io_formats
core-kesus-tablet
+ core-kqp-common
ydb-core-protos
ydb-core-scheme
ydb-core-sys_view
diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
index 2b00cd491b..94522095a3 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
@@ -41,6 +41,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-core-health_check
ydb-core-io_formats
core-kesus-tablet
+ core-kqp-common
ydb-core-protos
ydb-core-scheme
ydb-core-sys_view
diff --git a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
index 2b00cd491b..94522095a3 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
@@ -41,6 +41,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-core-health_check
ydb-core-io_formats
core-kesus-tablet
+ core-kqp-common
ydb-core-protos
ydb-core-scheme
ydb-core-sys_view
diff --git a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
index 38a8b69bd3..9ba2c54c10 100644
--- a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
@@ -40,6 +40,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-core-health_check
ydb-core-io_formats
core-kesus-tablet
+ core-kqp-common
ydb-core-protos
ydb-core-scheme
ydb-core-sys_view
diff --git a/ydb/core/grpc_services/rpc_cancel_operation.cpp b/ydb/core/grpc_services/rpc_cancel_operation.cpp
index 769f4f38e6..83321a9179 100644
--- a/ydb/core/grpc_services/rpc_cancel_operation.cpp
+++ b/ydb/core/grpc_services/rpc_cancel_operation.cpp
@@ -1,10 +1,13 @@
#include "service_operation.h"
#include "operation_helpers.h"
#include "rpc_operation_request_base.h"
+#include <ydb/core/kqp/common/events/script_executions.h>
+#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/tx/schemeshard/schemeshard_build_index.h>
#include <ydb/core/tx/schemeshard/schemeshard_export.h>
#include <ydb/core/tx/schemeshard/schemeshard_import.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/public/lib/operation_id/operation_id.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -30,6 +33,8 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC,
return "[CancelImport]";
case TOperationId::BUILD_INDEX:
return "[CancelIndexBuild]";
+ case TOperationId::SCRIPT_EXECUTION:
+ return "[CancelScriptExecution]";
default:
return "[Untagged]";
}
@@ -48,6 +53,13 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC,
}
}
+ bool NeedAllocateTxId() const {
+ const Ydb::TOperationId::EKind kind = OperationId.GetKind();
+ return kind == TOperationId::EXPORT
+ || kind == TOperationId::IMPORT
+ || kind == TOperationId::BUILD_INDEX;
+ }
+
void Handle(TEvExport::TEvCancelExportResponse::TPtr& ev) {
const auto& record = ev->Get()->Record.GetResponse();
@@ -93,11 +105,17 @@ public:
}
break;
+ case TOperationId::SCRIPT_EXECUTION:
+ SendCancelScriptExecutionOperation();
+ break;
+
default:
return Reply(StatusIds::UNSUPPORTED, TIssuesIds::DEFAULT_ERROR, "Unknown operation kind");
}
- AllocateTxId();
+ if (NeedAllocateTxId()) {
+ AllocateTxId();
+ }
} catch (const yexception&) {
return Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR, "Invalid operation id");
}
@@ -110,15 +128,25 @@ public:
hFunc(TEvExport::TEvCancelExportResponse, Handle);
hFunc(TEvImport::TEvCancelImportResponse, Handle);
hFunc(TEvIndexBuilder::TEvCancelResponse, Handle);
+ hFunc(NKqp::TEvCancelScriptExecutionOperationResponse, Handle);
default:
return StateBase(ev);
}
}
+ void Handle(NKqp::TEvCancelScriptExecutionOperationResponse::TPtr& ev) {
+ google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issuesProto;
+ NYql::IssuesToMessage(ev->Get()->Issues, &issuesProto);
+ Reply(ev->Get()->Status, issuesProto);
+ }
+
+ void SendCancelScriptExecutionOperation() {
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvCancelScriptExecutionOperation(DatabaseName, OperationId));
+ }
+
private:
TOperationId OperationId;
ui64 RawOperationId = 0;
-
}; // TCancelOperationRPC
void DoCancelOperationRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
diff --git a/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt
index c40b6b7675..99e0026d26 100644
--- a/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt
@@ -46,6 +46,8 @@ target_link_libraries(core-kqp-common PUBLIC
yql-dq-actors
yql-dq-common
parser-pg_wrapper-interface
+ public-lib-operation_id
+ lib-operation_id-protos
core-grpc_services-cancelation
library-cpp-lwtrace
tools-enum_parser-enum_serialization_runtime
@@ -55,6 +57,7 @@ target_sources(core-kqp-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_script_executions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp
diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
index c469afdc8b..3dcd30969b 100644
--- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
@@ -47,6 +47,8 @@ target_link_libraries(core-kqp-common PUBLIC
yql-dq-actors
yql-dq-common
parser-pg_wrapper-interface
+ public-lib-operation_id
+ lib-operation_id-protos
core-grpc_services-cancelation
library-cpp-lwtrace
tools-enum_parser-enum_serialization_runtime
@@ -56,6 +58,7 @@ target_sources(core-kqp-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_script_executions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp
diff --git a/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt
index c469afdc8b..3dcd30969b 100644
--- a/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt
@@ -47,6 +47,8 @@ target_link_libraries(core-kqp-common PUBLIC
yql-dq-actors
yql-dq-common
parser-pg_wrapper-interface
+ public-lib-operation_id
+ lib-operation_id-protos
core-grpc_services-cancelation
library-cpp-lwtrace
tools-enum_parser-enum_serialization_runtime
@@ -56,6 +58,7 @@ target_sources(core-kqp-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_script_executions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp
diff --git a/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt
index c40b6b7675..99e0026d26 100644
--- a/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt
@@ -46,6 +46,8 @@ target_link_libraries(core-kqp-common PUBLIC
yql-dq-actors
yql-dq-common
parser-pg_wrapper-interface
+ public-lib-operation_id
+ lib-operation_id-protos
core-grpc_services-cancelation
library-cpp-lwtrace
tools-enum_parser-enum_serialization_runtime
@@ -55,6 +57,7 @@ target_sources(core-kqp-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_script_executions.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp
diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h
index 9a0702f849..6a98198cb6 100644
--- a/ydb/core/kqp/common/events/events.h
+++ b/ydb/core/kqp/common/events/events.h
@@ -8,6 +8,7 @@
#include <ydb/core/kqp/common/shutdown/events.h>
#include <ydb/public/api/protos/draft/ydb_query.pb.h>
#include <ydb/library/yql/dq/actors/dq.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <library/cpp/actors/core/event_pb.h>
#include <library/cpp/actors/core/event_local.h>
@@ -32,6 +33,9 @@ struct TEvKqp {
struct TEvPingSessionRequest : public TEventPB<TEvPingSessionRequest,
NKikimrKqp::TEvPingSessionRequest, TKqpEvents::EvPingSessionRequest> {};
+ struct TEvCancelQueryRequest : public TEventPB<TEvCancelQueryRequest,
+ NKikimrKqp::TEvCancelQueryRequest, TKqpEvents::EvCancelQueryRequest> {};
+
using TEvCompileRequest = NPrivateEvents::TEvCompileRequest;
using TEvRecompileRequest = NPrivateEvents::TEvRecompileRequest;
@@ -77,6 +81,9 @@ struct TEvKqp {
struct TEvPingSessionResponse : public TEventPB<TEvPingSessionResponse,
NKikimrKqp::TEvPingSessionResponse, TKqpEvents::EvPingSessionResponse> {};
+ struct TEvCancelQueryResponse : public TEventPB<TEvCancelQueryResponse,
+ NKikimrKqp::TEvCancelQueryResponse, TKqpEvents::EvCancelQueryResponse> {};
+
struct TEvKqpProxyPublishRequest :
public TEventLocal<TEvKqpProxyPublishRequest, TKqpEvents::EvKqpProxyPublishRequest> {};
@@ -119,6 +126,29 @@ struct TEvKqp {
struct TEvFetchScriptResultsResponse : public TEventPB<TEvFetchScriptResultsResponse, NKikimrKqp::TEvFetchScriptResultsResponse, TKqpEvents::EvFetchScriptResultsResponse> {
};
+
+ struct TEvCancelScriptExecutionRequest : public TEventPB<TEvCancelScriptExecutionRequest, NKikimrKqp::TEvCancelScriptExecutionRequest, TKqpEvents::EvCancelScriptExecutionRequest> {
+ };
+
+ struct TEvCancelScriptExecutionResponse : public TEventPB<TEvCancelScriptExecutionResponse, NKikimrKqp::TEvCancelScriptExecutionResponse, TKqpEvents::EvCancelScriptExecutionResponse> {
+ TEvCancelScriptExecutionResponse() = default;
+
+ explicit TEvCancelScriptExecutionResponse(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues = {}) {
+ Record.SetStatus(status);
+ NYql::IssuesToMessage(issues, Record.MutableIssues());
+ }
+
+ TEvCancelScriptExecutionResponse(Ydb::StatusIds::StatusCode status, const TString& message)
+ : TEvCancelScriptExecutionResponse(status, TextToIssues(message))
+ {}
+
+ private:
+ static NYql::TIssues TextToIssues(const TString& message) {
+ NYql::TIssues issues;
+ issues.AddIssue(message);
+ return issues;
+ }
+ };
};
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h
index bc808241c0..56ba62daf6 100644
--- a/ydb/core/kqp/common/events/script_executions.h
+++ b/ydb/core/kqp/common/events/script_executions.h
@@ -66,4 +66,39 @@ struct TEvListScriptExecutionOperationsResponse : public NActors::TEventLocal<TE
std::vector<Ydb::Operations::Operation> Operations;
};
+struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
+ explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
+ : Database(database)
+ , OperationId(id)
+ {
+ }
+
+ TString Database;
+ NOperationId::TOperationId OperationId;
+};
+
+struct TEvCancelScriptExecutionOperationResponse : public NActors::TEventLocal<TEvCancelScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperationResponse> {
+ TEvCancelScriptExecutionOperationResponse() = default;
+
+ TEvCancelScriptExecutionOperationResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {})
+ : Status(status)
+ , Issues(std::move(issues))
+ {
+ }
+
+ Ydb::StatusIds::StatusCode Status;
+ NYql::TIssues Issues;
+};
+
+struct TEvScriptExecutionFinished : public NActors::TEventLocal<TEvScriptExecutionFinished, TKqpScriptExecutionEvents::EvScriptExecutionFinished> {
+ TEvScriptExecutionFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {})
+ : Status(status)
+ , Issues(std::move(issues))
+ {
+ }
+
+ Ydb::StatusIds::StatusCode Status;
+ NYql::TIssues Issues;
+};
+
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/kqp_script_executions.cpp b/ydb/core/kqp/common/kqp_script_executions.cpp
new file mode 100644
index 0000000000..f231dae077
--- /dev/null
+++ b/ydb/core/kqp/common/kqp_script_executions.cpp
@@ -0,0 +1,27 @@
+#include "kqp_script_executions.h"
+
+#include <ydb/public/lib/operation_id/protos/operation_id.pb.h>
+
+namespace NKikimr::NKqp {
+
+TString ScriptExecutionOperationFromExecutionId(const TString& executionId) {
+ Ydb::TOperationId operationId;
+ operationId.SetKind(Ydb::TOperationId::SCRIPT_EXECUTION);
+ NOperationId::AddOptionalValue(operationId, "actor_id", executionId);
+ return NOperationId::ProtoToString(operationId);
+}
+
+TMaybe<TString> ScriptExecutionFromOperation(const TString& operationId) {
+ NOperationId::TOperationId operation(operationId);
+ return ScriptExecutionFromOperation(operation);
+}
+
+TMaybe<TString> ScriptExecutionFromOperation(const NOperationId::TOperationId& operationId) {
+ const auto& values = operationId.GetValue("actor_id");
+ if (values.empty() || !values[0]) {
+ return Nothing();
+ }
+ return *values[0];
+}
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/kqp_script_executions.h b/ydb/core/kqp/common/kqp_script_executions.h
new file mode 100644
index 0000000000..d920d28b59
--- /dev/null
+++ b/ydb/core/kqp/common/kqp_script_executions.h
@@ -0,0 +1,14 @@
+#pragma once
+#include <ydb/public/lib/operation_id/operation_id.h>
+
+#include <util/generic/string.h>
+#include <util/generic/maybe.h>
+
+
+namespace NKikimr::NKqp {
+
+TString ScriptExecutionOperationFromExecutionId(const TString& executionId);
+TMaybe<TString> ScriptExecutionFromOperation(const TString& operationId);
+TMaybe<TString> ScriptExecutionFromOperation(const NOperationId::TOperationId& operationId);
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h
index d21cd64c1e..a8faace1ea 100644
--- a/ydb/core/kqp/common/simple/kqp_event_ids.h
+++ b/ydb/core/kqp/common/simple/kqp_event_ids.h
@@ -35,6 +35,10 @@ struct TKqpEvents {
EvFetchScriptResultsRequest,
EvFetchScriptResultsResponse,
EvKqpProxyPublishRequest,
+ EvCancelScriptExecutionRequest,
+ EvCancelScriptExecutionResponse,
+ EvCancelQueryRequest,
+ EvCancelQueryResponse,
};
static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
@@ -128,6 +132,9 @@ struct TKqpScriptExecutionEvents {
EvGetScriptExecutionOperationResponse,
EvListScriptExecutionOperations,
EvListScriptExecutionOperationsResponse,
+ EvCancelScriptExecutionOperation,
+ EvCancelScriptExecutionOperationResponse,
+ EvScriptExecutionFinished,
};
};
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index 2e5c72f45f..fb3f5bcd43 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -73,6 +73,7 @@ void TKqpCountersBase::Init() {
CloseSessionRequests = KqpGroup->GetCounter("Requests/CloseSession", true);
CreateSessionRequests = KqpGroup->GetCounter("Requests/CreateSession", true);
PingSessionRequests = KqpGroup->GetCounter("Requests/PingSession", true);
+ CancelQueryRequests = KqpGroup->GetCounter("Requests/CancelQuery", true);
RequestBytes = KqpGroup->GetCounter("Requests/Bytes", true);
YdbRequestBytes = YdbGroup->GetNamedCounter("name", "table.query.request.bytes", true);
@@ -306,6 +307,12 @@ void TKqpCountersBase::ReportQueryRequest(ui64 requestBytes, ui64 parametersByte
*YdbParametersBytes += parametersBytes;
}
+void TKqpCountersBase::ReportCancelQuery(ui64 requestSize) {
+ CancelQueryRequests->Inc();
+ *RequestBytes += requestSize;
+ *YdbRequestBytes += requestSize;
+}
+
void TKqpCountersBase::ReportQueryWithRangeScan() {
QueriesWithRangeScan->Inc();
}
@@ -860,6 +867,13 @@ void TKqpCounters::ReportQueryRequest(TKqpDbCountersPtr dbCounters, ui64 request
}
}
+void TKqpCounters::ReportCancelQuery(TKqpDbCountersPtr dbCounters, ui64 requestSize) {
+ TKqpCountersBase::ReportCancelQuery(requestSize);
+ if (dbCounters) {
+ dbCounters->ReportCancelQuery(requestSize);
+ }
+}
+
void TKqpCounters::ReportQueryWithRangeScan(TKqpDbCountersPtr dbCounters) {
TKqpCountersBase::ReportQueryWithRangeScan();
if (dbCounters) {
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index 1a0e8cb80c..ec906a81fc 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -42,6 +42,7 @@ protected:
void ReportPingSession(ui64 requestSize);
void ReportCloseSession(ui64 requestSize);
void ReportQueryRequest(ui64 requestBytes, ui64 parametersBytes, ui64 queryBytes);
+ void ReportCancelQuery(ui64 requestSize);
void ReportQueryWithRangeScan();
void ReportQueryWithFullScan();
@@ -112,6 +113,7 @@ protected:
::NMonitoring::TDynamicCounters::TCounterPtr CloseSessionRequests;
::NMonitoring::TDynamicCounters::TCounterPtr CreateSessionRequests;
::NMonitoring::TDynamicCounters::TCounterPtr PingSessionRequests;
+ ::NMonitoring::TDynamicCounters::TCounterPtr CancelQueryRequests;
::NMonitoring::TDynamicCounters::TCounterPtr RequestBytes;
::NMonitoring::TDynamicCounters::TCounterPtr YdbRequestBytes;
@@ -272,6 +274,7 @@ public:
void ReportQueryAction(TKqpDbCountersPtr dbCounters, NKikimrKqp::EQueryAction action);
void ReportQueryType(TKqpDbCountersPtr dbCounters, NKikimrKqp::EQueryType type);
void ReportQueryRequest(TKqpDbCountersPtr dbCounters, ui64 requestBytes, ui64 parametersBytes, ui64 queryBytes);
+ void ReportCancelQuery(TKqpDbCountersPtr dbCounters, ui64 requestSize);
void ReportResponseStatus(TKqpDbCountersPtr dbCounters, ui64 responseSize, Ydb::StatusIds::StatusCode ydbStatus);
void ReportResultsBytes(TKqpDbCountersPtr dbCounters, ui64 resultsSize);
diff --git a/ydb/core/kqp/counters/kqp_db_counters.h b/ydb/core/kqp/counters/kqp_db_counters.h
index d7ec834f5f..be45f77895 100644
--- a/ydb/core/kqp/counters/kqp_db_counters.h
+++ b/ydb/core/kqp/counters/kqp_db_counters.h
@@ -23,6 +23,7 @@ namespace NKqp {
XX(DB_KQP_REQ_QUERY_OTHER, OtherQueryRequests) \
XX(DB_KQP_CLOSE_SESSION_REQ, CloseSessionRequests) \
XX(DB_KQP_PING_SESSION_REQ, PingSessionRequests) \
+ XX(DB_KQP_CANCEL_QUERY_REQ, CancelQueryRequests) \
XX(DB_KQP_REQUEST_BYTES, RequestBytes) \
XX(DB_KQP_QUERY_BYTES, QueryBytes) \
XX(DB_KQP_PARAMS_BYTES, ParametersBytes) \
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt
index 455ba7bfbd..2ce9ff4dd0 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt
@@ -21,6 +21,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
ydb-core-base
core-cms-console
core-kqp-common
+ kqp-common-events
core-kqp-counters
core-kqp-run_script_actor
ydb-core-mind
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
index 631a038f59..fc1b20b171 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
@@ -22,6 +22,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
ydb-core-base
core-cms-console
core-kqp-common
+ kqp-common-events
core-kqp-counters
core-kqp-run_script_actor
ydb-core-mind
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt
index 631a038f59..fc1b20b171 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt
@@ -22,6 +22,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
ydb-core-base
core-cms-console
core-kqp-common
+ kqp-common-events
core-kqp-counters
core-kqp-run_script_actor
ydb-core-mind
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt
index 455ba7bfbd..2ce9ff4dd0 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt
@@ -21,6 +21,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
ydb-core-base
core-cms-console
core-kqp-common
+ kqp-common-events
core-kqp-counters
core-kqp-run_script_actor
ydb-core-mind
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 089211a488..7c81c9d5f9 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -705,6 +705,37 @@ public:
Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId);
}
+ void Handle(TEvKqp::TEvCancelQueryRequest::TPtr& ev) {
+ auto& event = ev->Get()->Record;
+ auto& request = event.GetRequest();
+
+ auto traceId = event.GetTraceId();
+ TKqpRequestInfo requestInfo(traceId);
+ auto sessionId = request.GetSessionId();
+ ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvCancelQueryRequest);
+ const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId);
+ auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr;
+ KQP_PROXY_LOG_D("Received cancel query request, request_id: " << requestId << ", trace_id: " << traceId);
+ Counters->ReportCancelQuery(dbCounters, request.ByteSize());
+
+ PendingRequests.SetSessionId(requestId, sessionId, dbCounters);
+
+ TActorId targetId;
+ if (sessionInfo) {
+ targetId = sessionInfo->WorkerId;
+ LocalSessions->StopIdleCheck(sessionInfo);
+ } else {
+ targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId);
+ if (!targetId) {
+ return;
+ }
+ }
+
+ Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId);
+ KQP_PROXY_LOG_D("Sent request to target, requestId: " << requestId
+ << ", targetId: " << targetId << ", sessionId: " << sessionId);
+ }
+
template<typename TEvent>
void ForwardEvent(TEvent ev) {
ui64 requestId = ev->Cookie;
@@ -1120,6 +1151,8 @@ public:
hFunc(TEvKqp::TEvProcessResponse, ForwardEvent);
hFunc(TEvKqp::TEvCreateSessionRequest, Handle);
hFunc(TEvKqp::TEvPingSessionRequest, Handle);
+ hFunc(TEvKqp::TEvCancelQueryRequest, Handle);
+ hFunc(TEvKqp::TEvCancelQueryResponse, ForwardEvent);
hFunc(TEvKqp::TEvCloseSessionResponse, Handle);
hFunc(TEvKqp::TEvPingSessionResponse, ForwardEvent);
hFunc(TEvKqp::TEvInitiateShutdownRequest, Handle);
@@ -1131,6 +1164,7 @@ public:
hFunc(TEvPrivate::TEvScriptExecutionsTablesCreationFinished, Handle);
hFunc(NKqp::TEvGetScriptExecutionOperation, Handle);
hFunc(NKqp::TEvListScriptExecutionOperations, Handle);
+ hFunc(NKqp::TEvCancelScriptExecutionOperation, Handle);
default:
Y_FAIL("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s",
ev->GetTypeRewrite(), ev->ToString().data());
@@ -1336,6 +1370,10 @@ private:
Register(CreateListScriptExecutionOperationsActor(std::move(ev)));
}
+ void Handle(NKqp::TEvCancelScriptExecutionOperation::TPtr& ev) {
+ Register(CreateCancelScriptExecutionOperationActor(std::move(ev)));
+ }
+
private:
NYql::NLog::YqlLoggerScope YqlLoggerScope;
NKikimrConfig::TLogConfig LogConfig;
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index e7dc2021f0..a0a3677d37 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -2,6 +2,8 @@
#include <ydb/core/base/path.h>
#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/kqp/common/events/events.h>
+#include <ydb/core/kqp/common/kqp_script_executions.h>
#include <ydb/core/kqp/run_script_actor/kqp_run_script_actor.h>
#include <ydb/core/protos/services.pb.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
@@ -18,6 +20,7 @@
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/interconnect.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/protobuf/json/json2proto.h>
#include <library/cpp/protobuf/json/proto2json.h>
@@ -27,26 +30,6 @@
namespace NKikimr::NKqp {
-TString ScriptExecutionOperationFromExecutionId(const TString& executionId) {
- Ydb::TOperationId operationId;
- operationId.SetKind(Ydb::TOperationId::SCRIPT_EXECUTION);
- NOperationId::AddOptionalValue(operationId, "actor_id", executionId);
- return NOperationId::ProtoToString(operationId);
-}
-
-TMaybe<TString> ScriptExecutionFromOperation(const TString& operationId) {
- NOperationId::TOperationId operation(operationId);
- return ScriptExecutionFromOperation(operation);
-}
-
-TMaybe<TString> ScriptExecutionFromOperation(const NOperationId::TOperationId& operationId) {
- const auto& values = operationId.GetValue("actor_id");
- if (values.empty() || !values[0]) {
- return Nothing();
- }
- return *values[0];
-}
-
namespace {
#define KQP_PROXY_LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_PROXY, stream)
@@ -85,6 +68,7 @@ struct TEvPrivate {
enum EEv : ui32 {
EvCreateScriptOperationResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvCreateTableResponse,
+ EvLeaseCheckResult,
EvEnd
};
@@ -113,13 +97,26 @@ struct TEvPrivate {
struct TEvCreateTableResponse : public NActors::TEventLocal<TEvCreateTableResponse, EvCreateTableResponse> {
TEvCreateTableResponse() = default;
};
+
+ struct TEvLeaseCheckResult : public NActors::TEventLocal<TEvLeaseCheckResult, EvLeaseCheckResult> {
+ TEvLeaseCheckResult(Ydb::StatusIds::StatusCode statusCode, NYql::TIssues&& issues, TMaybe<Ydb::StatusIds::StatusCode> operationStatus)
+ : Status(statusCode)
+ , Issues(std::move(issues))
+ , OperationStatus(operationStatus)
+ {
+ }
+
+ const Ydb::StatusIds::StatusCode Status;
+ const NYql::TIssues Issues;
+ const TMaybe<Ydb::StatusIds::StatusCode> OperationStatus;
+ };
};
class TQueryBase : public NKikimr::TQueryBase {
public:
- TQueryBase()
- : NKikimr::TQueryBase(NKikimrServices::KQP_PROXY)
+ TQueryBase(TString sessionId = {})
+ : NKikimr::TQueryBase(NKikimrServices::KQP_PROXY, sessionId)
{}
};
@@ -566,7 +563,59 @@ private:
NActors::TActorId RunScriptActorId;
};
-class TScriptExecutionFinisher : public TQueryBase {
+class TScriptExecutionFinisherBase : public TQueryBase {
+public:
+ using TQueryBase::TQueryBase;
+
+ void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, const NYql::TIssues& issues, TTxControl txControl = TTxControl::ContinueAndCommitTx()) {
+ TString sql = R"(
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+ DECLARE $operation_status AS Int32;
+ DECLARE $execution_status AS Int32;
+ DECLARE $issues AS JsonDocument;
+
+ UPDATE `.metadata/script_executions`
+ SET
+ operation_status = $operation_status,
+ execution_status = $execution_status,
+ issues = $issues,
+ end_ts = CurrentUtcTimestamp()
+ WHERE database = $database AND execution_id = $execution_id;
+
+ DELETE FROM `.metadata/script_execution_leases`
+ WHERE database = $database AND execution_id = $execution_id;
+ )";
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$database")
+ .Utf8(database)
+ .Build()
+ .AddParam("$execution_id")
+ .Utf8(executionId)
+ .Build()
+ .AddParam("$operation_status")
+ .Int32(operationStatus)
+ .Build()
+ .AddParam("$execution_status")
+ .Int32(execStatus)
+ .Build()
+ .AddParam("$issues")
+ .JsonDocument(SerializeIssues(issues))
+ .Build();
+
+ RunDataQuery(sql, &params, txControl);
+ }
+
+ void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, const TString& message, TTxControl txControl = TTxControl::ContinueAndCommitTx()) {
+ NYql::TIssues issues;
+ issues.AddIssue(message);
+ FinishScriptExecution(database, executionId, operationStatus, execStatus, issues, txControl);
+ }
+};
+
+class TScriptExecutionFinisher : public TScriptExecutionFinisherBase {
public:
TScriptExecutionFinisher(
const TString& executionId,
@@ -607,73 +656,40 @@ public:
}
void OnQueryResult() override {
- if (ResultSets.size() != 1) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
- return;
- }
- NYdb::TResultSetParser result(ResultSets[0]);
- if (result.RowsCount() == 0) {
- Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution");
- return;
- }
-
- result.TryNextRow();
-
- const TMaybe<i64> leaseGenerationInDatabase = result.ColumnParser(0).GetOptionalInt64();
- if (!leaseGenerationInDatabase) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unknown lease generation");
- return;
- }
-
- if (LeaseGeneration != static_cast<ui64>(*leaseGenerationInDatabase)) {
- Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Lease was lost");
- return;
- }
-
- TString sql = R"(
- DECLARE $database AS Text;
- DECLARE $execution_id AS Text;
- DECLARE $operation_status AS Int32;
- DECLARE $execution_status AS Int32;
- DECLARE $issues AS JsonDocument;
-
- UPDATE `.metadata/script_executions`
- SET
- operation_status = $operation_status,
- execution_status = $execution_status,
- issues = $issues,
- end_ts = CurrentUtcTimestamp()
- WHERE database = $database AND execution_id = $execution_id;
+ if (!FinishWasRun) {
+ if (ResultSets.size() != 1) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
+ return;
+ }
+ NYdb::TResultSetParser result(ResultSets[0]);
+ if (result.RowsCount() == 0) {
+ Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution");
+ return;
+ }
- DELETE FROM `.metadata/script_execution_leases`
- WHERE database = $database AND execution_id = $execution_id;
- )";
+ result.TryNextRow();
- NYdb::TParamsBuilder params;
- params
- .AddParam("$database")
- .Utf8(Database)
- .Build()
- .AddParam("$execution_id")
- .Utf8(ExecutionId)
- .Build()
- .AddParam("$operation_status")
- .Int32(OperationStatus)
- .Build()
- .AddParam("$execution_status")
- .Int32(ExecStatus)
- .Build()
- .AddParam("$issues")
- .JsonDocument(SerializeIssues(Issues))
- .Build();
+ const TMaybe<i64> leaseGenerationInDatabase = result.ColumnParser(0).GetOptionalInt64();
+ if (!leaseGenerationInDatabase) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unknown lease generation");
+ return;
+ }
- Y_UNUSED(DeserializeIssues);
+ if (LeaseGeneration != static_cast<ui64>(*leaseGenerationInDatabase)) {
+ Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Lease was lost");
+ return;
+ }
- RunDataQuery(sql, &params, TTxControl::ContinueAndCommitTx());
+ FinishScriptExecution(Database, ExecutionId, OperationStatus, ExecStatus, Issues);
+ FinishWasRun = true;
+ } else {
+ Finish();
+ }
}
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
KQP_PROXY_LOG_D("Finish script execution operation. ExecutionId: " << ExecutionId << ". Lease generation: " << LeaseGeneration << ": " << Ydb::StatusIds::StatusCode_Name(status) << ". Issues: " << issues.ToOneLineString());
+ Send(Owner, new TEvScriptExecutionFinished(status, std::move(issues)));
}
private:
@@ -683,6 +699,7 @@ private:
const Ydb::StatusIds::StatusCode OperationStatus;
const Ydb::Query::ExecStatus ExecStatus;
const NYql::TIssues Issues;
+ bool FinishWasRun = false;
};
class TGetScriptExecutionOperationActor : public TQueryBase {
@@ -948,6 +965,196 @@ private:
std::vector<Ydb::Operations::Operation> Operations;
};
+class TCheckLeaseStatusActor : public TScriptExecutionFinisherBase {
+public:
+ TCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease = Ydb::StatusIds::ABORTED)
+ : Database(database)
+ , ExecutionId(executionId)
+ , StatusOnExpiredLease(statusOnExpiredLease)
+ {}
+
+ void OnRunQuery() override {
+ const TString sql = R"(
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+
+ SELECT operation_status FROM `.metadata/script_executions`
+ WHERE database = $database AND execution_id = $execution_id;
+
+ SELECT lease_deadline FROM `.metadata/script_execution_leases`
+ WHERE database = $database AND execution_id = $execution_id;
+ )";
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$database")
+ .Utf8(Database)
+ .Build()
+ .AddParam("$execution_id")
+ .Utf8(ExecutionId)
+ .Build();
+
+ RunDataQuery(sql, &params, TTxControl::BeginTx());
+ }
+
+ void OnQueryResult() override {
+ if (!FinishWasRun) {
+ if (ResultSets.size() != 2) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
+ return;
+ }
+ NYdb::TResultSetParser result(ResultSets[0]);
+ if (result.RowsCount() == 0) {
+ Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution");
+ return;
+ }
+
+ result.TryNextRow();
+
+ TMaybe<i32> operationStatus = result.ColumnParser(0).GetOptionalInt32();
+ TMaybe<TInstant> leaseDeadline;
+
+ NYdb::TResultSetParser result2(ResultSets[1]);
+
+ if (result2.RowsCount() > 0) {
+ result2.TryNextRow();
+
+ leaseDeadline = result2.ColumnParser(0).GetOptionalTimestamp();
+ }
+
+ if (leaseDeadline) {
+ if (operationStatus) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state");
+ } else if (*leaseDeadline < RunStartTime) {
+ FinishWasRun = true;
+ FinishScriptExecution(Database, ExecutionId, StatusOnExpiredLease, Ydb::Query::EXEC_STATUS_ABORTED, "Lease expired");
+ } else {
+ // OperationStatus is Nothing(): currently running
+ Finish();
+ }
+ } else if (operationStatus) {
+ OperationStatus = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus);
+ Finish();
+ } else {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state");
+ }
+ } else {
+ OperationStatus = StatusOnExpiredLease;
+ Finish();
+ }
+ }
+
+ void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
+ Send(Owner, new TEvPrivate::TEvLeaseCheckResult(status, std::move(issues), OperationStatus));
+ }
+
+private:
+ const TInstant RunStartTime = TInstant::Now();
+ const TString Database;
+ const TString ExecutionId;
+ const Ydb::StatusIds::StatusCode StatusOnExpiredLease;
+ TMaybe<Ydb::StatusIds::StatusCode> OperationStatus;
+ bool FinishWasRun = false;
+};
+
+class TCancelScriptExecutionOperationActor : public NActors::TActorBootstrapped<TCancelScriptExecutionOperationActor> {
+public:
+ TCancelScriptExecutionOperationActor(TEvCancelScriptExecutionOperation::TPtr ev)
+ : Request(std::move(ev))
+ {}
+
+ void Bootstrap() {
+ const TMaybe<TString> executionId = NKqp::ScriptExecutionFromOperation(Request->Get()->OperationId);
+ if (!executionId) {
+ return Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect operation id");
+ }
+ ExecutionId = *executionId;
+
+ if (!NKqp::ScriptExecutionIdToActorId(ExecutionId, RunScriptActor)) {
+ return Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect operation id");
+ }
+
+ Become(&TCancelScriptExecutionOperationActor::StateFunc);
+ Register(new TCheckLeaseStatusActor(Request->Get()->Database, ExecutionId));
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvLeaseCheckResult, Handle);
+ hFunc(TEvKqp::TEvCancelScriptExecutionResponse, Handle);
+ hFunc(NActors::TEvents::TEvUndelivered, Handle);
+ hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
+ )
+
+ void Handle(TEvPrivate::TEvLeaseCheckResult::TPtr& ev) {
+ if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) {
+ if (ev->Get()->OperationStatus) {
+ Reply(Ydb::StatusIds::PRECONDITION_FAILED); // Already finished.
+ } else {
+ if (CancelSent) { // We have not found the actor, but after it status of the operation is not defined, something strage happened.
+ Reply(Ydb::StatusIds::INTERNAL_ERROR, "Failed to cancel script execution operation");
+ } else {
+ SendCancelToRunScriptActor(); // The race: operation is still working, but it can finish before it receives cancel signal. Try to cancel first and then maybe check its status.
+ }
+ }
+ } else {
+ Reply(ev->Get()->Status, std::move(ev->Get()->Issues)); // Error getting operation in database.
+ }
+ }
+
+ void SendCancelToRunScriptActor() {
+ ui64 flags = IEventHandle::FlagTrackDelivery;
+ if (RunScriptActor.NodeId() != SelfId().NodeId()) {
+ flags |= IEventHandle::FlagSubscribeOnSession;
+ SubscribedOnSession = RunScriptActor.NodeId();
+ }
+ Send(RunScriptActor, new TEvKqp::TEvCancelScriptExecutionRequest(), flags);
+ CancelSent = true;
+ }
+
+ void Handle(TEvKqp::TEvCancelScriptExecutionResponse::TPtr& ev) {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues);
+ Reply(ev->Get()->Record.GetStatus(), std::move(issues));
+ }
+
+ void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
+ if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) { // The actor probably had finished before our cancel message arrived.
+ Register(new TCheckLeaseStatusActor(Request->Get()->Database, ExecutionId)); // Check if the operation has finished.
+ } else {
+ Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination");
+ }
+ }
+
+ void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) {
+ Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination");
+ }
+
+ void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
+ Send(Request->Sender, new TEvCancelScriptExecutionOperationResponse(status, std::move(issues)));
+ PassAway();
+ }
+
+ void Reply(Ydb::StatusIds::StatusCode status, const TString& message) {
+ NYql::TIssues issues;
+ issues.AddIssue(message);
+ Reply(status, std::move(issues));
+ }
+
+ void PassAway() override {
+ if (SubscribedOnSession) {
+ Send(TActivationContext::InterconnectProxy(*SubscribedOnSession), new TEvents::TEvUnsubscribe());
+ }
+ NActors::TActorBootstrapped<TCancelScriptExecutionOperationActor>::PassAway();
+ }
+
+private:
+ TEvCancelScriptExecutionOperation::TPtr Request;
+ TString ExecutionId;
+ NActors::TActorId RunScriptActor;
+ TMaybe<ui32> SubscribedOnSession;
+ bool CancelSent = false;
+};
+
} // anonymous namespace
NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev) {
@@ -977,4 +1184,8 @@ NActors::IActor* CreateListScriptExecutionOperationsActor(TEvListScriptExecution
return new TListScriptExecutionOperationsActor(std::move(ev));
}
+NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecutionOperation::TPtr ev) {
+ return new TCancelScriptExecutionOperationActor(std::move(ev));
+}
+
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h
index 6022a1f5d7..11867d5781 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.h
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h
@@ -12,11 +12,6 @@
namespace NKikimr::NKqp {
-// Helpers.
-TString ScriptExecutionOperationFromExecutionId(const TString& executionId);
-TMaybe<TString> ScriptExecutionFromOperation(const TString& operationId);
-TMaybe<TString> ScriptExecutionFromOperation(const NOperationId::TOperationId& operationId);
-
// Creates all needed tables.
// Sends result event back when the work is done.
NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent);
@@ -27,6 +22,7 @@ NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPt
// Operation API impl.
NActors::IActor* CreateGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev);
NActors::IActor* CreateListScriptExecutionOperationsActor(TEvListScriptExecutionOperations::TPtr ev);
+NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecutionOperation::TPtr ev);
// Updates status in database.
NActors::IActor* CreateScriptExecutionFinisher(
diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt
index 0359b1c93e..a612f9027d 100644
--- a/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt
@@ -17,6 +17,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC
cpp-actors-core
ydb-core-base
ydb-core-protos
+ kqp-common-events
core-kqp-executer_actor
api-protos
)
diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt
index fb65e2fbd0..fea0e3d0a0 100644
--- a/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt
@@ -18,6 +18,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC
cpp-actors-core
ydb-core-base
ydb-core-protos
+ kqp-common-events
core-kqp-executer_actor
api-protos
)
diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt
index fb65e2fbd0..fea0e3d0a0 100644
--- a/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt
@@ -18,6 +18,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC
cpp-actors-core
ydb-core-base
ydb-core-protos
+ kqp-common-events
core-kqp-executer_actor
api-protos
)
diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt
index 0359b1c93e..a612f9027d 100644
--- a/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt
@@ -17,6 +17,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC
cpp-actors-core
ydb-core-base
ydb-core-protos
+ kqp-common-events
core-kqp-executer_actor
api-protos
)
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
index f046094109..d440065241 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
@@ -1,6 +1,7 @@
#include "kqp_run_script_actor.h"
#include <ydb/core/base/kikimr_issue.h>
+#include <ydb/core/kqp/common/events/events.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/core/kqp/proxy_service/kqp_script_executions.h>
@@ -14,6 +15,8 @@
#include <util/generic/size_literals.h>
+#include <forward_list>
+
#define LOG_T(stream) LOG_TRACE_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream);
#define LOG_D(stream) LOG_DEBUG_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream);
#define LOG_I(stream) LOG_INFO_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream);
@@ -28,6 +31,13 @@ constexpr ui64 RESULT_SIZE_LIMIT = 10_MB;
constexpr int RESULT_ROWS_LIMIT = 1000;
class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
+ enum class ERunState {
+ Created,
+ Running,
+ Cancelling,
+ Cancelled,
+ Finished,
+ };
public:
TRunScriptActor(const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration)
: Request(request)
@@ -45,34 +55,89 @@ private:
STRICT_STFUNC(StateFunc,
hFunc(NActors::TEvents::TEvWakeup, Handle);
hFunc(NActors::TEvents::TEvPoison, Handle);
- hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
- hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
- hFunc(NKqp::TEvKqp::TEvFetchScriptResultsRequest, Handle);
+ hFunc(TEvKqpExecuter::TEvStreamData, Handle);
+ hFunc(TEvKqp::TEvQueryResponse, Handle);
+ hFunc(TEvKqp::TEvFetchScriptResultsRequest, Handle);
+ hFunc(TEvKqp::TEvCreateSessionResponse, Handle);
+ IgnoreFunc(TEvKqp::TEvCloseSessionResponse);
+ hFunc(TEvKqp::TEvCancelQueryResponse, Handle);
+ hFunc(TEvKqp::TEvCancelScriptExecutionRequest, Handle);
+ hFunc(TEvScriptExecutionFinished, Handle);
)
+ void SendToKqpProxy(THolder<NActors::IEventBase> ev) {
+ if (!Send(MakeKqpProxyID(SelfId().NodeId()), ev.Release())) {
+ Issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
+ Finish(Ydb::StatusIds::INTERNAL_ERROR);
+ }
+ }
+
+ void CreateSession() {
+ auto ev = MakeHolder<TEvKqp::TEvCreateSessionRequest>();
+ ev->Record.MutableRequest()->SetDatabase(Request.GetRequest().GetDatabase());
+
+ SendToKqpProxy(std::move(ev));
+ }
+
+ void Handle(TEvKqp::TEvCreateSessionResponse::TPtr& ev) {
+ const auto& resp = ev->Get()->Record;
+ if (resp.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ if (resp.GetResourceExhausted()) {
+ Finish(Ydb::StatusIds::OVERLOADED);
+ } else {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR);
+ }
+ } else {
+ SessionId = resp.GetResponse().GetSessionId();
+
+ if (RunState == ERunState::Running) {
+ Start();
+ } else {
+ CloseSession();
+ }
+ }
+ }
+
+ void CloseSession() {
+ if (SessionId) {
+ auto ev = MakeHolder<TEvKqp::TEvCloseSessionRequest>();
+ ev->Record.MutableRequest()->SetSessionId(SessionId);
+
+ Send(MakeKqpProxyID(SelfId().NodeId()), ev.Release());
+ SessionId.clear();
+ }
+ }
+
void Start() {
- auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ auto ev = MakeHolder<TEvKqp::TEvQueryRequest>();
ev->Record = Request;
+ ev->Record.MutableRequest()->SetSessionId(SessionId);
NActors::ActorIdToProto(SelfId(), ev->Record.MutableRequestActorId());
- if (!Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release())) {
- Issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
- Finish(Ydb::StatusIds::INTERNAL_ERROR);
- }
+ SendToKqpProxy(std::move(ev));
}
// TODO: remove this after there will be a normal way to store results and generate execution id
void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
- Start();
+ if (RunState == ERunState::Created) {
+ RunState = ERunState::Running;
+ CreateSession();
+ }
}
+ // Event in case of error in registering script in database
+ // Just pass away, because we have not started execution.
void Handle(NActors::TEvents::TEvPoison::TPtr&) {
+ Y_VERIFY(RunState == ERunState::Created);
PassAway();
}
- void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ void Handle(TEvKqpExecuter::TEvStreamData::TPtr& ev) {
+ if (RunState != ERunState::Running) {
+ return;
+ }
+ auto resp = MakeHolder<TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
resp->Record.SetFreeSpace(RESULT_SIZE_LIMIT);
@@ -87,7 +152,10 @@ private:
}
}
- void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
+ void Handle(TEvKqp::TEvQueryResponse::TPtr& ev) {
+ if (RunState != ERunState::Running) {
+ return;
+ }
auto& record = ev->Get()->Record.GetRef();
const auto& issueMessage = record.GetResponse().GetQueryIssues();
@@ -96,11 +164,16 @@ private:
Finish(record.GetYdbStatus());
}
- void Handle(NKqp::TEvKqp::TEvFetchScriptResultsRequest::TPtr& ev) {
- auto resp = MakeHolder<NKqp::TEvKqp::TEvFetchScriptResultsResponse>();
+ void Handle(TEvKqp::TEvFetchScriptResultsRequest::TPtr& ev) {
+ auto resp = MakeHolder<TEvKqp::TEvFetchScriptResultsResponse>();
if (!IsFinished()) {
- resp->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
- resp->Record.AddIssues()->set_message("Results are not ready");
+ if (RunState == ERunState::Created || RunState == ERunState::Running) {
+ resp->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
+ resp->Record.AddIssues()->set_message("Results are not ready");
+ } else if (RunState == ERunState::Cancelled || RunState == ERunState::Cancelling) {
+ resp->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
+ resp->Record.AddIssues()->set_message("Script execution is cancelled");
+ }
} else {
if (!ResultSets.empty()) {
resp->Record.SetResultSetIndex(0);
@@ -126,7 +199,55 @@ private:
Send(ev->Sender, std::move(resp));
}
- void MergeResultSet(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
+ void Handle(TEvKqp::TEvCancelScriptExecutionRequest::TPtr& ev) {
+ switch (RunState) {
+ case ERunState::Created:
+ CancelRequests.emplace_front(std::move(ev));
+ Finish(Ydb::StatusIds::CANCELLED, ERunState::Cancelling);
+ break;
+ case ERunState::Running:
+ CancelRequests.emplace_front(std::move(ev));
+ CancelRunningQuery();
+ break;
+ case ERunState::Cancelling:
+ CancelRequests.emplace_front(std::move(ev));
+ break;
+ case ERunState::Cancelled:
+ Send(ev->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(Ydb::StatusIds::PRECONDITION_FAILED, "Already cancelled"));
+ break;
+ case ERunState::Finished:
+ Send(ev->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(Ydb::StatusIds::PRECONDITION_FAILED, "Already finished"));
+ break;
+ }
+ }
+
+ void CancelRunningQuery() {
+ if (SessionId.empty()) {
+ Finish(Ydb::StatusIds::CANCELLED, ERunState::Cancelling);
+ } else {
+ RunState = ERunState::Cancelling;
+ auto ev = MakeHolder<TEvKqp::TEvCancelQueryRequest>();
+ ev->Record.MutableRequest()->SetSessionId(SessionId);
+
+ Send(MakeKqpProxyID(SelfId().NodeId()), ev.Release());
+ }
+ }
+
+ void Handle(TEvKqp::TEvCancelQueryResponse::TPtr&) {
+ Finish(Ydb::StatusIds::CANCELLED, ERunState::Cancelling);
+ }
+
+ void Handle(TEvScriptExecutionFinished::TPtr& ev) {
+ if (RunState == ERunState::Cancelling) {
+ RunState = ERunState::Cancelled;
+ for (auto& req : CancelRequests) {
+ Send(req->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(ev->Get()->Status, ev->Get()->Issues));
+ }
+ CancelRequests.clear();
+ }
+ }
+
+ void MergeResultSet(TEvKqpExecuter::TEvStreamData::TPtr& ev) {
if (ResultSets.empty()) {
ResultSets.emplace_back(ev->Get()->Record.GetResultSet());
return;
@@ -153,18 +274,31 @@ private:
static Ydb::Query::ExecStatus GetExecStatusFromStatusCode(Ydb::StatusIds::StatusCode status) {
if (status == Ydb::StatusIds::SUCCESS) {
return Ydb::Query::EXEC_STATUS_COMPLETED;
+ } else if (status == Ydb::StatusIds::CANCELLED) {
+ return Ydb::Query::EXEC_STATUS_CANCELLED;
} else {
return Ydb::Query::EXEC_STATUS_FAILED;
}
}
- void Finish(Ydb::StatusIds::StatusCode status) {
+ void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finished) {
+ RunState = runState;
Status = status;
Register(CreateScriptExecutionFinisher(ActorIdToScriptExecutionId(SelfId()), Database, LeaseGeneration, status, GetExecStatusFromStatusCode(status), Issues));
+ if (RunState == ERunState::Cancelling) {
+ Issues.AddIssue("Script execution is cancelled");
+ ResultSets.clear();
+ }
+ CloseSession();
+ }
+
+ void PassAway() override {
+ CloseSession();
+ NActors::TActorBootstrapped<TRunScriptActor>::PassAway();
}
bool IsFinished() const {
- return Status != Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
+ return RunState == ERunState::Finished;
}
bool IsTruncated() const {
@@ -175,6 +309,9 @@ private:
const NKikimrKqp::TEvQueryRequest Request;
const TString Database;
const ui64 LeaseGeneration;
+ TString SessionId;
+ ERunState RunState = ERunState::Created;
+ std::forward_list<TEvKqp::TEvCancelScriptExecutionRequest::TPtr> CancelRequests;
// Result
NYql::TIssues Issues;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index e698bde798..83b35a1ba7 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1749,6 +1749,21 @@ public:
Cleanup(IsFatalError(ydbStatus));
}
+ void Handle(TEvKqp::TEvCancelQueryRequest::TPtr& ev) {
+ {
+ auto abort = MakeHolder<NYql::NDq::TEvDq::TEvAbortExecution>();
+ abort->Record.SetStatusCode(NYql::NDqProto::StatusIds::CANCELLED);
+ abort->Record.AddIssues()->set_message("Canceled");
+ Send(SelfId(), abort.Release());
+ }
+
+ {
+ auto resp = MakeHolder<TEvKqp::TEvCancelQueryResponse>();
+ resp->Record.SetStatus(Ydb::StatusIds::SUCCESS);
+ Send(ev->Sender, resp.Release(), 0, ev->Cookie);
+ }
+ }
+
STFUNC(ReadyState) {
try {
switch (ev->GetTypeRewrite()) {
@@ -1757,6 +1772,7 @@ public:
hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
+ hFunc(TEvKqp::TEvCancelQueryRequest, Handle);
// forgotten messages from previous aborted request
hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
@@ -1785,6 +1801,7 @@ public:
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(NGRpcService::TEvClientLost, HandleClientLost);
+ hFunc(TEvKqp::TEvCancelQueryRequest, Handle);
// forgotten messages from previous aborted request
hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop);
@@ -1815,6 +1832,7 @@ public:
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(NGRpcService::TEvClientLost, HandleClientLost);
+ hFunc(TEvKqp::TEvCancelQueryRequest, Handle);
// forgotten messages from previous aborted request
hFunc(TEvKqp::TEvCompileResponse, Handle);
@@ -1844,6 +1862,7 @@ public:
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
hFunc(TEvKqp::TEvContinueShutdown, Handle);
hFunc(NGRpcService::TEvClientLost, HandleNoop);
+ hFunc(TEvKqp::TEvCancelQueryRequest, HandleNoop);
// forgotten messages from previous aborted request
hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index cf65f59613..e73b0b6017 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -302,6 +302,40 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_VALUES_EQUAL(listed, ScriptExecutionsCount);
UNIT_ASSERT_EQUAL(ops, listedOps);
}
+
+ Y_UNIT_TEST(CancelScriptExecution) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto scriptExecutionOperation = db.ExecuteScript(R"(
+ SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key;
+ SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key;
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+ UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
+
+ NYdb::NOperation::TOperationClient opClient(kikimr.GetDriver());
+ std::vector<NYdb::TAsyncStatus> cancelFutures(3);
+ // Check races also
+ for (auto& f : cancelFutures) {
+ f = opClient.Cancel(scriptExecutionOperation.Id());
+ }
+
+ for (auto& f : cancelFutures) {
+ auto cancelStatus = f.ExtractValueSync();
+ UNIT_ASSERT_C(cancelStatus.GetStatus() == NYdb::EStatus::SUCCESS || cancelStatus.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED, cancelStatus.GetIssues().ToString());
+ }
+
+ auto op = opClient.Get<NYdb::NQuery::TScriptExecutionOperation>(scriptExecutionOperation.Id()).ExtractValueSync();
+ UNIT_ASSERT(op.Ready());
+ UNIT_ASSERT(op.Metadata().ExecStatus == EExecStatus::Completed || op.Metadata().ExecStatus == EExecStatus::Canceled);
+ UNIT_ASSERT_EQUAL(op.Metadata().ExecutionId, scriptExecutionOperation.Metadata().ExecutionId);
+ UNIT_ASSERT(op.Status().GetStatus() == NYdb::EStatus::SUCCESS || op.Status().GetStatus() == NYdb::EStatus::CANCELLED);
+
+ // Check cancel for completed query
+ auto cancelStatus = opClient.Cancel(scriptExecutionOperation.Id()).ExtractValueSync();
+ UNIT_ASSERT_C(cancelStatus.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED, cancelStatus.GetIssues().ToString());
+ }
}
} // namespace NKqp
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 2c155c661f..6b29828fec 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -345,6 +345,20 @@ message TEvDataQueryStreamPart {
repeated NKikimrMiniKQL.TResult Results = 2;
};
+message TCancelQueryRequest {
+ optional bytes SessionId = 1;
+};
+
+message TEvCancelQueryRequest {
+ optional TCancelQueryRequest Request = 1;
+ optional string TraceId = 2;
+};
+
+message TEvCancelQueryResponse {
+ optional Ydb.StatusIds.StatusCode Status = 1;
+ repeated Ydb.Issue.IssueMessage Issues = 2;
+}
+
// Executer
message TExecuterTxRequest {
@@ -612,3 +626,12 @@ message TEvFetchScriptResultsResponse {
optional uint64 ResultSetIndex = 3;
optional Ydb.ResultSet ResultSet = 4;
}
+
+// Request that is sent to run script actor to cancel execution and write finish status to database.
+message TEvCancelScriptExecutionRequest {
+}
+
+message TEvCancelScriptExecutionResponse {
+ optional Ydb.StatusIds.StatusCode Status = 1;
+ repeated Ydb.Issue.IssueMessage Issues = 2;
+}