diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-06-05 15:11:52 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-06-05 15:11:52 +0300 |
commit | 45c174f12cdc1defbcec3574d224d9d5be4e5b95 (patch) | |
tree | 80967404904595cd733b7e0b58dd928f8034dbf1 | |
parent | 4e8aaf987f81f9c46ccee9a9f2943432d816365d (diff) | |
download | ydb-45c174f12cdc1defbcec3574d224d9d5be4e5b95.tar.gz |
Support CancelOperation for ScriptExecution operations
32 files changed, 749 insertions, 108 deletions
diff --git a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt index 38a8b69bd3..9ba2c54c10 100644 --- a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt @@ -40,6 +40,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-core-health_check ydb-core-io_formats core-kesus-tablet + core-kqp-common ydb-core-protos ydb-core-scheme ydb-core-sys_view diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt index 2b00cd491b..94522095a3 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt @@ -41,6 +41,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-core-health_check ydb-core-io_formats core-kesus-tablet + core-kqp-common ydb-core-protos ydb-core-scheme ydb-core-sys_view diff --git a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt index 2b00cd491b..94522095a3 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt @@ -41,6 +41,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-core-health_check ydb-core-io_formats core-kesus-tablet + core-kqp-common ydb-core-protos ydb-core-scheme ydb-core-sys_view diff --git a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt index 38a8b69bd3..9ba2c54c10 100644 --- a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt @@ -40,6 +40,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-core-health_check ydb-core-io_formats core-kesus-tablet + core-kqp-common ydb-core-protos ydb-core-scheme ydb-core-sys_view diff --git a/ydb/core/grpc_services/rpc_cancel_operation.cpp b/ydb/core/grpc_services/rpc_cancel_operation.cpp index 769f4f38e6..83321a9179 100644 --- a/ydb/core/grpc_services/rpc_cancel_operation.cpp +++ b/ydb/core/grpc_services/rpc_cancel_operation.cpp @@ -1,10 +1,13 @@ #include "service_operation.h" #include "operation_helpers.h" #include "rpc_operation_request_base.h" +#include <ydb/core/kqp/common/events/script_executions.h> +#include <ydb/core/kqp/common/kqp.h> #include <ydb/core/grpc_services/base/base.h> #include <ydb/core/tx/schemeshard/schemeshard_build_index.h> #include <ydb/core/tx/schemeshard/schemeshard_export.h> #include <ydb/core/tx/schemeshard/schemeshard_import.h> +#include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/public/lib/operation_id/operation_id.h> #include <library/cpp/actors/core/hfunc.h> @@ -30,6 +33,8 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC, return "[CancelImport]"; case TOperationId::BUILD_INDEX: return "[CancelIndexBuild]"; + case TOperationId::SCRIPT_EXECUTION: + return "[CancelScriptExecution]"; default: return "[Untagged]"; } @@ -48,6 +53,13 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC, } } + bool NeedAllocateTxId() const { + const Ydb::TOperationId::EKind kind = OperationId.GetKind(); + return kind == TOperationId::EXPORT + || kind == TOperationId::IMPORT + || kind == TOperationId::BUILD_INDEX; + } + void Handle(TEvExport::TEvCancelExportResponse::TPtr& ev) { const auto& record = ev->Get()->Record.GetResponse(); @@ -93,11 +105,17 @@ public: } break; + case TOperationId::SCRIPT_EXECUTION: + SendCancelScriptExecutionOperation(); + break; + default: return Reply(StatusIds::UNSUPPORTED, TIssuesIds::DEFAULT_ERROR, "Unknown operation kind"); } - AllocateTxId(); + if (NeedAllocateTxId()) { + AllocateTxId(); + } } catch (const yexception&) { return Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR, "Invalid operation id"); } @@ -110,15 +128,25 @@ public: hFunc(TEvExport::TEvCancelExportResponse, Handle); hFunc(TEvImport::TEvCancelImportResponse, Handle); hFunc(TEvIndexBuilder::TEvCancelResponse, Handle); + hFunc(NKqp::TEvCancelScriptExecutionOperationResponse, Handle); default: return StateBase(ev); } } + void Handle(NKqp::TEvCancelScriptExecutionOperationResponse::TPtr& ev) { + google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issuesProto; + NYql::IssuesToMessage(ev->Get()->Issues, &issuesProto); + Reply(ev->Get()->Status, issuesProto); + } + + void SendCancelScriptExecutionOperation() { + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvCancelScriptExecutionOperation(DatabaseName, OperationId)); + } + private: TOperationId OperationId; ui64 RawOperationId = 0; - }; // TCancelOperationRPC void DoCancelOperationRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) { diff --git a/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt index c40b6b7675..99e0026d26 100644 --- a/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/common/CMakeLists.darwin-x86_64.txt @@ -46,6 +46,8 @@ target_link_libraries(core-kqp-common PUBLIC yql-dq-actors yql-dq-common parser-pg_wrapper-interface + public-lib-operation_id + lib-operation_id-protos core-grpc_services-cancelation library-cpp-lwtrace tools-enum_parser-enum_serialization_runtime @@ -55,6 +57,7 @@ target_sources(core-kqp-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_script_executions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt index c469afdc8b..3dcd30969b 100644 --- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt @@ -47,6 +47,8 @@ target_link_libraries(core-kqp-common PUBLIC yql-dq-actors yql-dq-common parser-pg_wrapper-interface + public-lib-operation_id + lib-operation_id-protos core-grpc_services-cancelation library-cpp-lwtrace tools-enum_parser-enum_serialization_runtime @@ -56,6 +58,7 @@ target_sources(core-kqp-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_script_executions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp diff --git a/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt index c469afdc8b..3dcd30969b 100644 --- a/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/common/CMakeLists.linux-x86_64.txt @@ -47,6 +47,8 @@ target_link_libraries(core-kqp-common PUBLIC yql-dq-actors yql-dq-common parser-pg_wrapper-interface + public-lib-operation_id + lib-operation_id-protos core-grpc_services-cancelation library-cpp-lwtrace tools-enum_parser-enum_serialization_runtime @@ -56,6 +58,7 @@ target_sources(core-kqp-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_script_executions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp diff --git a/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt index c40b6b7675..99e0026d26 100644 --- a/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/common/CMakeLists.windows-x86_64.txt @@ -46,6 +46,8 @@ target_link_libraries(core-kqp-common PUBLIC yql-dq-actors yql-dq-common parser-pg_wrapper-interface + public-lib-operation_id + lib-operation_id-protos core-grpc_services-cancelation library-cpp-lwtrace tools-enum_parser-enum_serialization_runtime @@ -55,6 +57,7 @@ target_sources(core-kqp-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_resolve.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_ru_calc.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_script_executions.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h index 9a0702f849..6a98198cb6 100644 --- a/ydb/core/kqp/common/events/events.h +++ b/ydb/core/kqp/common/events/events.h @@ -8,6 +8,7 @@ #include <ydb/core/kqp/common/shutdown/events.h> #include <ydb/public/api/protos/draft/ydb_query.pb.h> #include <ydb/library/yql/dq/actors/dq.h> +#include <ydb/library/yql/public/issue/yql_issue_message.h> #include <library/cpp/actors/core/event_pb.h> #include <library/cpp/actors/core/event_local.h> @@ -32,6 +33,9 @@ struct TEvKqp { struct TEvPingSessionRequest : public TEventPB<TEvPingSessionRequest, NKikimrKqp::TEvPingSessionRequest, TKqpEvents::EvPingSessionRequest> {}; + struct TEvCancelQueryRequest : public TEventPB<TEvCancelQueryRequest, + NKikimrKqp::TEvCancelQueryRequest, TKqpEvents::EvCancelQueryRequest> {}; + using TEvCompileRequest = NPrivateEvents::TEvCompileRequest; using TEvRecompileRequest = NPrivateEvents::TEvRecompileRequest; @@ -77,6 +81,9 @@ struct TEvKqp { struct TEvPingSessionResponse : public TEventPB<TEvPingSessionResponse, NKikimrKqp::TEvPingSessionResponse, TKqpEvents::EvPingSessionResponse> {}; + struct TEvCancelQueryResponse : public TEventPB<TEvCancelQueryResponse, + NKikimrKqp::TEvCancelQueryResponse, TKqpEvents::EvCancelQueryResponse> {}; + struct TEvKqpProxyPublishRequest : public TEventLocal<TEvKqpProxyPublishRequest, TKqpEvents::EvKqpProxyPublishRequest> {}; @@ -119,6 +126,29 @@ struct TEvKqp { struct TEvFetchScriptResultsResponse : public TEventPB<TEvFetchScriptResultsResponse, NKikimrKqp::TEvFetchScriptResultsResponse, TKqpEvents::EvFetchScriptResultsResponse> { }; + + struct TEvCancelScriptExecutionRequest : public TEventPB<TEvCancelScriptExecutionRequest, NKikimrKqp::TEvCancelScriptExecutionRequest, TKqpEvents::EvCancelScriptExecutionRequest> { + }; + + struct TEvCancelScriptExecutionResponse : public TEventPB<TEvCancelScriptExecutionResponse, NKikimrKqp::TEvCancelScriptExecutionResponse, TKqpEvents::EvCancelScriptExecutionResponse> { + TEvCancelScriptExecutionResponse() = default; + + explicit TEvCancelScriptExecutionResponse(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues = {}) { + Record.SetStatus(status); + NYql::IssuesToMessage(issues, Record.MutableIssues()); + } + + TEvCancelScriptExecutionResponse(Ydb::StatusIds::StatusCode status, const TString& message) + : TEvCancelScriptExecutionResponse(status, TextToIssues(message)) + {} + + private: + static NYql::TIssues TextToIssues(const TString& message) { + NYql::TIssues issues; + issues.AddIssue(message); + return issues; + } + }; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h index bc808241c0..56ba62daf6 100644 --- a/ydb/core/kqp/common/events/script_executions.h +++ b/ydb/core/kqp/common/events/script_executions.h @@ -66,4 +66,39 @@ struct TEvListScriptExecutionOperationsResponse : public NActors::TEventLocal<TE std::vector<Ydb::Operations::Operation> Operations; }; +struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> { + explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) + : Database(database) + , OperationId(id) + { + } + + TString Database; + NOperationId::TOperationId OperationId; +}; + +struct TEvCancelScriptExecutionOperationResponse : public NActors::TEventLocal<TEvCancelScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperationResponse> { + TEvCancelScriptExecutionOperationResponse() = default; + + TEvCancelScriptExecutionOperationResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) + : Status(status) + , Issues(std::move(issues)) + { + } + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; +}; + +struct TEvScriptExecutionFinished : public NActors::TEventLocal<TEvScriptExecutionFinished, TKqpScriptExecutionEvents::EvScriptExecutionFinished> { + TEvScriptExecutionFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) + : Status(status) + , Issues(std::move(issues)) + { + } + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; +}; + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/kqp_script_executions.cpp b/ydb/core/kqp/common/kqp_script_executions.cpp new file mode 100644 index 0000000000..f231dae077 --- /dev/null +++ b/ydb/core/kqp/common/kqp_script_executions.cpp @@ -0,0 +1,27 @@ +#include "kqp_script_executions.h" + +#include <ydb/public/lib/operation_id/protos/operation_id.pb.h> + +namespace NKikimr::NKqp { + +TString ScriptExecutionOperationFromExecutionId(const TString& executionId) { + Ydb::TOperationId operationId; + operationId.SetKind(Ydb::TOperationId::SCRIPT_EXECUTION); + NOperationId::AddOptionalValue(operationId, "actor_id", executionId); + return NOperationId::ProtoToString(operationId); +} + +TMaybe<TString> ScriptExecutionFromOperation(const TString& operationId) { + NOperationId::TOperationId operation(operationId); + return ScriptExecutionFromOperation(operation); +} + +TMaybe<TString> ScriptExecutionFromOperation(const NOperationId::TOperationId& operationId) { + const auto& values = operationId.GetValue("actor_id"); + if (values.empty() || !values[0]) { + return Nothing(); + } + return *values[0]; +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/kqp_script_executions.h b/ydb/core/kqp/common/kqp_script_executions.h new file mode 100644 index 0000000000..d920d28b59 --- /dev/null +++ b/ydb/core/kqp/common/kqp_script_executions.h @@ -0,0 +1,14 @@ +#pragma once +#include <ydb/public/lib/operation_id/operation_id.h> + +#include <util/generic/string.h> +#include <util/generic/maybe.h> + + +namespace NKikimr::NKqp { + +TString ScriptExecutionOperationFromExecutionId(const TString& executionId); +TMaybe<TString> ScriptExecutionFromOperation(const TString& operationId); +TMaybe<TString> ScriptExecutionFromOperation(const NOperationId::TOperationId& operationId); + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index d21cd64c1e..a8faace1ea 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -35,6 +35,10 @@ struct TKqpEvents { EvFetchScriptResultsRequest, EvFetchScriptResultsResponse, EvKqpProxyPublishRequest, + EvCancelScriptExecutionRequest, + EvCancelScriptExecutionResponse, + EvCancelQueryRequest, + EvCancelQueryResponse, }; static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution); @@ -128,6 +132,9 @@ struct TKqpScriptExecutionEvents { EvGetScriptExecutionOperationResponse, EvListScriptExecutionOperations, EvListScriptExecutionOperationsResponse, + EvCancelScriptExecutionOperation, + EvCancelScriptExecutionOperationResponse, + EvScriptExecutionFinished, }; }; diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 2e5c72f45f..fb3f5bcd43 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -73,6 +73,7 @@ void TKqpCountersBase::Init() { CloseSessionRequests = KqpGroup->GetCounter("Requests/CloseSession", true); CreateSessionRequests = KqpGroup->GetCounter("Requests/CreateSession", true); PingSessionRequests = KqpGroup->GetCounter("Requests/PingSession", true); + CancelQueryRequests = KqpGroup->GetCounter("Requests/CancelQuery", true); RequestBytes = KqpGroup->GetCounter("Requests/Bytes", true); YdbRequestBytes = YdbGroup->GetNamedCounter("name", "table.query.request.bytes", true); @@ -306,6 +307,12 @@ void TKqpCountersBase::ReportQueryRequest(ui64 requestBytes, ui64 parametersByte *YdbParametersBytes += parametersBytes; } +void TKqpCountersBase::ReportCancelQuery(ui64 requestSize) { + CancelQueryRequests->Inc(); + *RequestBytes += requestSize; + *YdbRequestBytes += requestSize; +} + void TKqpCountersBase::ReportQueryWithRangeScan() { QueriesWithRangeScan->Inc(); } @@ -860,6 +867,13 @@ void TKqpCounters::ReportQueryRequest(TKqpDbCountersPtr dbCounters, ui64 request } } +void TKqpCounters::ReportCancelQuery(TKqpDbCountersPtr dbCounters, ui64 requestSize) { + TKqpCountersBase::ReportCancelQuery(requestSize); + if (dbCounters) { + dbCounters->ReportCancelQuery(requestSize); + } +} + void TKqpCounters::ReportQueryWithRangeScan(TKqpDbCountersPtr dbCounters) { TKqpCountersBase::ReportQueryWithRangeScan(); if (dbCounters) { diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index 1a0e8cb80c..ec906a81fc 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -42,6 +42,7 @@ protected: void ReportPingSession(ui64 requestSize); void ReportCloseSession(ui64 requestSize); void ReportQueryRequest(ui64 requestBytes, ui64 parametersBytes, ui64 queryBytes); + void ReportCancelQuery(ui64 requestSize); void ReportQueryWithRangeScan(); void ReportQueryWithFullScan(); @@ -112,6 +113,7 @@ protected: ::NMonitoring::TDynamicCounters::TCounterPtr CloseSessionRequests; ::NMonitoring::TDynamicCounters::TCounterPtr CreateSessionRequests; ::NMonitoring::TDynamicCounters::TCounterPtr PingSessionRequests; + ::NMonitoring::TDynamicCounters::TCounterPtr CancelQueryRequests; ::NMonitoring::TDynamicCounters::TCounterPtr RequestBytes; ::NMonitoring::TDynamicCounters::TCounterPtr YdbRequestBytes; @@ -272,6 +274,7 @@ public: void ReportQueryAction(TKqpDbCountersPtr dbCounters, NKikimrKqp::EQueryAction action); void ReportQueryType(TKqpDbCountersPtr dbCounters, NKikimrKqp::EQueryType type); void ReportQueryRequest(TKqpDbCountersPtr dbCounters, ui64 requestBytes, ui64 parametersBytes, ui64 queryBytes); + void ReportCancelQuery(TKqpDbCountersPtr dbCounters, ui64 requestSize); void ReportResponseStatus(TKqpDbCountersPtr dbCounters, ui64 responseSize, Ydb::StatusIds::StatusCode ydbStatus); void ReportResultsBytes(TKqpDbCountersPtr dbCounters, ui64 resultsSize); diff --git a/ydb/core/kqp/counters/kqp_db_counters.h b/ydb/core/kqp/counters/kqp_db_counters.h index d7ec834f5f..be45f77895 100644 --- a/ydb/core/kqp/counters/kqp_db_counters.h +++ b/ydb/core/kqp/counters/kqp_db_counters.h @@ -23,6 +23,7 @@ namespace NKqp { XX(DB_KQP_REQ_QUERY_OTHER, OtherQueryRequests) \ XX(DB_KQP_CLOSE_SESSION_REQ, CloseSessionRequests) \ XX(DB_KQP_PING_SESSION_REQ, PingSessionRequests) \ + XX(DB_KQP_CANCEL_QUERY_REQ, CancelQueryRequests) \ XX(DB_KQP_REQUEST_BYTES, RequestBytes) \ XX(DB_KQP_QUERY_BYTES, QueryBytes) \ XX(DB_KQP_PARAMS_BYTES, ParametersBytes) \ diff --git a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt index 455ba7bfbd..2ce9ff4dd0 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt @@ -21,6 +21,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC ydb-core-base core-cms-console core-kqp-common + kqp-common-events core-kqp-counters core-kqp-run_script_actor ydb-core-mind diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt index 631a038f59..fc1b20b171 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt @@ -22,6 +22,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC ydb-core-base core-cms-console core-kqp-common + kqp-common-events core-kqp-counters core-kqp-run_script_actor ydb-core-mind diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt index 631a038f59..fc1b20b171 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt @@ -22,6 +22,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC ydb-core-base core-cms-console core-kqp-common + kqp-common-events core-kqp-counters core-kqp-run_script_actor ydb-core-mind diff --git a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt index 455ba7bfbd..2ce9ff4dd0 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt @@ -21,6 +21,7 @@ target_link_libraries(core-kqp-proxy_service PUBLIC ydb-core-base core-cms-console core-kqp-common + kqp-common-events core-kqp-counters core-kqp-run_script_actor ydb-core-mind diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 089211a488..7c81c9d5f9 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -705,6 +705,37 @@ public: Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId); } + void Handle(TEvKqp::TEvCancelQueryRequest::TPtr& ev) { + auto& event = ev->Get()->Record; + auto& request = event.GetRequest(); + + auto traceId = event.GetTraceId(); + TKqpRequestInfo requestInfo(traceId); + auto sessionId = request.GetSessionId(); + ui64 requestId = PendingRequests.RegisterRequest(ev->Sender, ev->Cookie, traceId, TKqpEvents::EvCancelQueryRequest); + const TKqpSessionInfo* sessionInfo = LocalSessions->FindPtr(sessionId); + auto dbCounters = sessionInfo ? sessionInfo->DbCounters : nullptr; + KQP_PROXY_LOG_D("Received cancel query request, request_id: " << requestId << ", trace_id: " << traceId); + Counters->ReportCancelQuery(dbCounters, request.ByteSize()); + + PendingRequests.SetSessionId(requestId, sessionId, dbCounters); + + TActorId targetId; + if (sessionInfo) { + targetId = sessionInfo->WorkerId; + LocalSessions->StopIdleCheck(sessionInfo); + } else { + targetId = TryGetSessionTargetActor(sessionId, requestInfo, requestId); + if (!targetId) { + return; + } + } + + Send(targetId, ev->Release().Release(), IEventHandle::FlagTrackDelivery, requestId); + KQP_PROXY_LOG_D("Sent request to target, requestId: " << requestId + << ", targetId: " << targetId << ", sessionId: " << sessionId); + } + template<typename TEvent> void ForwardEvent(TEvent ev) { ui64 requestId = ev->Cookie; @@ -1120,6 +1151,8 @@ public: hFunc(TEvKqp::TEvProcessResponse, ForwardEvent); hFunc(TEvKqp::TEvCreateSessionRequest, Handle); hFunc(TEvKqp::TEvPingSessionRequest, Handle); + hFunc(TEvKqp::TEvCancelQueryRequest, Handle); + hFunc(TEvKqp::TEvCancelQueryResponse, ForwardEvent); hFunc(TEvKqp::TEvCloseSessionResponse, Handle); hFunc(TEvKqp::TEvPingSessionResponse, ForwardEvent); hFunc(TEvKqp::TEvInitiateShutdownRequest, Handle); @@ -1131,6 +1164,7 @@ public: hFunc(TEvPrivate::TEvScriptExecutionsTablesCreationFinished, Handle); hFunc(NKqp::TEvGetScriptExecutionOperation, Handle); hFunc(NKqp::TEvListScriptExecutionOperations, Handle); + hFunc(NKqp::TEvCancelScriptExecutionOperation, Handle); default: Y_FAIL("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s", ev->GetTypeRewrite(), ev->ToString().data()); @@ -1336,6 +1370,10 @@ private: Register(CreateListScriptExecutionOperationsActor(std::move(ev))); } + void Handle(NKqp::TEvCancelScriptExecutionOperation::TPtr& ev) { + Register(CreateCancelScriptExecutionOperationActor(std::move(ev))); + } + private: NYql::NLog::YqlLoggerScope YqlLoggerScope; NKikimrConfig::TLogConfig LogConfig; diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index e7dc2021f0..a0a3677d37 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -2,6 +2,8 @@ #include <ydb/core/base/path.h> #include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/kqp/common/events/events.h> +#include <ydb/core/kqp/common/kqp_script_executions.h> #include <ydb/core/kqp/run_script_actor/kqp_run_script_actor.h> #include <ydb/core/protos/services.pb.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> @@ -18,6 +20,7 @@ #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/interconnect.h> #include <library/cpp/actors/core/log.h> #include <library/cpp/protobuf/json/json2proto.h> #include <library/cpp/protobuf/json/proto2json.h> @@ -27,26 +30,6 @@ namespace NKikimr::NKqp { -TString ScriptExecutionOperationFromExecutionId(const TString& executionId) { - Ydb::TOperationId operationId; - operationId.SetKind(Ydb::TOperationId::SCRIPT_EXECUTION); - NOperationId::AddOptionalValue(operationId, "actor_id", executionId); - return NOperationId::ProtoToString(operationId); -} - -TMaybe<TString> ScriptExecutionFromOperation(const TString& operationId) { - NOperationId::TOperationId operation(operationId); - return ScriptExecutionFromOperation(operation); -} - -TMaybe<TString> ScriptExecutionFromOperation(const NOperationId::TOperationId& operationId) { - const auto& values = operationId.GetValue("actor_id"); - if (values.empty() || !values[0]) { - return Nothing(); - } - return *values[0]; -} - namespace { #define KQP_PROXY_LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_PROXY, stream) @@ -85,6 +68,7 @@ struct TEvPrivate { enum EEv : ui32 { EvCreateScriptOperationResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvCreateTableResponse, + EvLeaseCheckResult, EvEnd }; @@ -113,13 +97,26 @@ struct TEvPrivate { struct TEvCreateTableResponse : public NActors::TEventLocal<TEvCreateTableResponse, EvCreateTableResponse> { TEvCreateTableResponse() = default; }; + + struct TEvLeaseCheckResult : public NActors::TEventLocal<TEvLeaseCheckResult, EvLeaseCheckResult> { + TEvLeaseCheckResult(Ydb::StatusIds::StatusCode statusCode, NYql::TIssues&& issues, TMaybe<Ydb::StatusIds::StatusCode> operationStatus) + : Status(statusCode) + , Issues(std::move(issues)) + , OperationStatus(operationStatus) + { + } + + const Ydb::StatusIds::StatusCode Status; + const NYql::TIssues Issues; + const TMaybe<Ydb::StatusIds::StatusCode> OperationStatus; + }; }; class TQueryBase : public NKikimr::TQueryBase { public: - TQueryBase() - : NKikimr::TQueryBase(NKikimrServices::KQP_PROXY) + TQueryBase(TString sessionId = {}) + : NKikimr::TQueryBase(NKikimrServices::KQP_PROXY, sessionId) {} }; @@ -566,7 +563,59 @@ private: NActors::TActorId RunScriptActorId; }; -class TScriptExecutionFinisher : public TQueryBase { +class TScriptExecutionFinisherBase : public TQueryBase { +public: + using TQueryBase::TQueryBase; + + void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, const NYql::TIssues& issues, TTxControl txControl = TTxControl::ContinueAndCommitTx()) { + TString sql = R"( + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + DECLARE $operation_status AS Int32; + DECLARE $execution_status AS Int32; + DECLARE $issues AS JsonDocument; + + UPDATE `.metadata/script_executions` + SET + operation_status = $operation_status, + execution_status = $execution_status, + issues = $issues, + end_ts = CurrentUtcTimestamp() + WHERE database = $database AND execution_id = $execution_id; + + DELETE FROM `.metadata/script_execution_leases` + WHERE database = $database AND execution_id = $execution_id; + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$database") + .Utf8(database) + .Build() + .AddParam("$execution_id") + .Utf8(executionId) + .Build() + .AddParam("$operation_status") + .Int32(operationStatus) + .Build() + .AddParam("$execution_status") + .Int32(execStatus) + .Build() + .AddParam("$issues") + .JsonDocument(SerializeIssues(issues)) + .Build(); + + RunDataQuery(sql, ¶ms, txControl); + } + + void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, const TString& message, TTxControl txControl = TTxControl::ContinueAndCommitTx()) { + NYql::TIssues issues; + issues.AddIssue(message); + FinishScriptExecution(database, executionId, operationStatus, execStatus, issues, txControl); + } +}; + +class TScriptExecutionFinisher : public TScriptExecutionFinisherBase { public: TScriptExecutionFinisher( const TString& executionId, @@ -607,73 +656,40 @@ public: } void OnQueryResult() override { - if (ResultSets.size() != 1) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); - return; - } - NYdb::TResultSetParser result(ResultSets[0]); - if (result.RowsCount() == 0) { - Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution"); - return; - } - - result.TryNextRow(); - - const TMaybe<i64> leaseGenerationInDatabase = result.ColumnParser(0).GetOptionalInt64(); - if (!leaseGenerationInDatabase) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unknown lease generation"); - return; - } - - if (LeaseGeneration != static_cast<ui64>(*leaseGenerationInDatabase)) { - Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Lease was lost"); - return; - } - - TString sql = R"( - DECLARE $database AS Text; - DECLARE $execution_id AS Text; - DECLARE $operation_status AS Int32; - DECLARE $execution_status AS Int32; - DECLARE $issues AS JsonDocument; - - UPDATE `.metadata/script_executions` - SET - operation_status = $operation_status, - execution_status = $execution_status, - issues = $issues, - end_ts = CurrentUtcTimestamp() - WHERE database = $database AND execution_id = $execution_id; + if (!FinishWasRun) { + if (ResultSets.size() != 1) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); + return; + } + NYdb::TResultSetParser result(ResultSets[0]); + if (result.RowsCount() == 0) { + Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution"); + return; + } - DELETE FROM `.metadata/script_execution_leases` - WHERE database = $database AND execution_id = $execution_id; - )"; + result.TryNextRow(); - NYdb::TParamsBuilder params; - params - .AddParam("$database") - .Utf8(Database) - .Build() - .AddParam("$execution_id") - .Utf8(ExecutionId) - .Build() - .AddParam("$operation_status") - .Int32(OperationStatus) - .Build() - .AddParam("$execution_status") - .Int32(ExecStatus) - .Build() - .AddParam("$issues") - .JsonDocument(SerializeIssues(Issues)) - .Build(); + const TMaybe<i64> leaseGenerationInDatabase = result.ColumnParser(0).GetOptionalInt64(); + if (!leaseGenerationInDatabase) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unknown lease generation"); + return; + } - Y_UNUSED(DeserializeIssues); + if (LeaseGeneration != static_cast<ui64>(*leaseGenerationInDatabase)) { + Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Lease was lost"); + return; + } - RunDataQuery(sql, ¶ms, TTxControl::ContinueAndCommitTx()); + FinishScriptExecution(Database, ExecutionId, OperationStatus, ExecStatus, Issues); + FinishWasRun = true; + } else { + Finish(); + } } void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { KQP_PROXY_LOG_D("Finish script execution operation. ExecutionId: " << ExecutionId << ". Lease generation: " << LeaseGeneration << ": " << Ydb::StatusIds::StatusCode_Name(status) << ". Issues: " << issues.ToOneLineString()); + Send(Owner, new TEvScriptExecutionFinished(status, std::move(issues))); } private: @@ -683,6 +699,7 @@ private: const Ydb::StatusIds::StatusCode OperationStatus; const Ydb::Query::ExecStatus ExecStatus; const NYql::TIssues Issues; + bool FinishWasRun = false; }; class TGetScriptExecutionOperationActor : public TQueryBase { @@ -948,6 +965,196 @@ private: std::vector<Ydb::Operations::Operation> Operations; }; +class TCheckLeaseStatusActor : public TScriptExecutionFinisherBase { +public: + TCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease = Ydb::StatusIds::ABORTED) + : Database(database) + , ExecutionId(executionId) + , StatusOnExpiredLease(statusOnExpiredLease) + {} + + void OnRunQuery() override { + const TString sql = R"( + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + + SELECT operation_status FROM `.metadata/script_executions` + WHERE database = $database AND execution_id = $execution_id; + + SELECT lease_deadline FROM `.metadata/script_execution_leases` + WHERE database = $database AND execution_id = $execution_id; + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$database") + .Utf8(Database) + .Build() + .AddParam("$execution_id") + .Utf8(ExecutionId) + .Build(); + + RunDataQuery(sql, ¶ms, TTxControl::BeginTx()); + } + + void OnQueryResult() override { + if (!FinishWasRun) { + if (ResultSets.size() != 2) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); + return; + } + NYdb::TResultSetParser result(ResultSets[0]); + if (result.RowsCount() == 0) { + Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution"); + return; + } + + result.TryNextRow(); + + TMaybe<i32> operationStatus = result.ColumnParser(0).GetOptionalInt32(); + TMaybe<TInstant> leaseDeadline; + + NYdb::TResultSetParser result2(ResultSets[1]); + + if (result2.RowsCount() > 0) { + result2.TryNextRow(); + + leaseDeadline = result2.ColumnParser(0).GetOptionalTimestamp(); + } + + if (leaseDeadline) { + if (operationStatus) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state"); + } else if (*leaseDeadline < RunStartTime) { + FinishWasRun = true; + FinishScriptExecution(Database, ExecutionId, StatusOnExpiredLease, Ydb::Query::EXEC_STATUS_ABORTED, "Lease expired"); + } else { + // OperationStatus is Nothing(): currently running + Finish(); + } + } else if (operationStatus) { + OperationStatus = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus); + Finish(); + } else { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state"); + } + } else { + OperationStatus = StatusOnExpiredLease; + Finish(); + } + } + + void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { + Send(Owner, new TEvPrivate::TEvLeaseCheckResult(status, std::move(issues), OperationStatus)); + } + +private: + const TInstant RunStartTime = TInstant::Now(); + const TString Database; + const TString ExecutionId; + const Ydb::StatusIds::StatusCode StatusOnExpiredLease; + TMaybe<Ydb::StatusIds::StatusCode> OperationStatus; + bool FinishWasRun = false; +}; + +class TCancelScriptExecutionOperationActor : public NActors::TActorBootstrapped<TCancelScriptExecutionOperationActor> { +public: + TCancelScriptExecutionOperationActor(TEvCancelScriptExecutionOperation::TPtr ev) + : Request(std::move(ev)) + {} + + void Bootstrap() { + const TMaybe<TString> executionId = NKqp::ScriptExecutionFromOperation(Request->Get()->OperationId); + if (!executionId) { + return Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect operation id"); + } + ExecutionId = *executionId; + + if (!NKqp::ScriptExecutionIdToActorId(ExecutionId, RunScriptActor)) { + return Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect operation id"); + } + + Become(&TCancelScriptExecutionOperationActor::StateFunc); + Register(new TCheckLeaseStatusActor(Request->Get()->Database, ExecutionId)); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvLeaseCheckResult, Handle); + hFunc(TEvKqp::TEvCancelScriptExecutionResponse, Handle); + hFunc(NActors::TEvents::TEvUndelivered, Handle); + hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle); + ) + + void Handle(TEvPrivate::TEvLeaseCheckResult::TPtr& ev) { + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + if (ev->Get()->OperationStatus) { + Reply(Ydb::StatusIds::PRECONDITION_FAILED); // Already finished. + } else { + if (CancelSent) { // We have not found the actor, but after it status of the operation is not defined, something strage happened. + Reply(Ydb::StatusIds::INTERNAL_ERROR, "Failed to cancel script execution operation"); + } else { + SendCancelToRunScriptActor(); // The race: operation is still working, but it can finish before it receives cancel signal. Try to cancel first and then maybe check its status. + } + } + } else { + Reply(ev->Get()->Status, std::move(ev->Get()->Issues)); // Error getting operation in database. + } + } + + void SendCancelToRunScriptActor() { + ui64 flags = IEventHandle::FlagTrackDelivery; + if (RunScriptActor.NodeId() != SelfId().NodeId()) { + flags |= IEventHandle::FlagSubscribeOnSession; + SubscribedOnSession = RunScriptActor.NodeId(); + } + Send(RunScriptActor, new TEvKqp::TEvCancelScriptExecutionRequest(), flags); + CancelSent = true; + } + + void Handle(TEvKqp::TEvCancelScriptExecutionResponse::TPtr& ev) { + NYql::TIssues issues; + NYql::IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); + Reply(ev->Get()->Record.GetStatus(), std::move(issues)); + } + + void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { + if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) { // The actor probably had finished before our cancel message arrived. + Register(new TCheckLeaseStatusActor(Request->Get()->Database, ExecutionId)); // Check if the operation has finished. + } else { + Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination"); + } + } + + void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) { + Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination"); + } + + void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { + Send(Request->Sender, new TEvCancelScriptExecutionOperationResponse(status, std::move(issues))); + PassAway(); + } + + void Reply(Ydb::StatusIds::StatusCode status, const TString& message) { + NYql::TIssues issues; + issues.AddIssue(message); + Reply(status, std::move(issues)); + } + + void PassAway() override { + if (SubscribedOnSession) { + Send(TActivationContext::InterconnectProxy(*SubscribedOnSession), new TEvents::TEvUnsubscribe()); + } + NActors::TActorBootstrapped<TCancelScriptExecutionOperationActor>::PassAway(); + } + +private: + TEvCancelScriptExecutionOperation::TPtr Request; + TString ExecutionId; + NActors::TActorId RunScriptActor; + TMaybe<ui32> SubscribedOnSession; + bool CancelSent = false; +}; + } // anonymous namespace NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev) { @@ -977,4 +1184,8 @@ NActors::IActor* CreateListScriptExecutionOperationsActor(TEvListScriptExecution return new TListScriptExecutionOperationsActor(std::move(ev)); } +NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecutionOperation::TPtr ev) { + return new TCancelScriptExecutionOperationActor(std::move(ev)); +} + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h index 6022a1f5d7..11867d5781 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.h +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h @@ -12,11 +12,6 @@ namespace NKikimr::NKqp { -// Helpers. -TString ScriptExecutionOperationFromExecutionId(const TString& executionId); -TMaybe<TString> ScriptExecutionFromOperation(const TString& operationId); -TMaybe<TString> ScriptExecutionFromOperation(const NOperationId::TOperationId& operationId); - // Creates all needed tables. // Sends result event back when the work is done. NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent); @@ -27,6 +22,7 @@ NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPt // Operation API impl. NActors::IActor* CreateGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev); NActors::IActor* CreateListScriptExecutionOperationsActor(TEvListScriptExecutionOperations::TPtr ev); +NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecutionOperation::TPtr ev); // Updates status in database. NActors::IActor* CreateScriptExecutionFinisher( diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt index 0359b1c93e..a612f9027d 100644 --- a/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/run_script_actor/CMakeLists.darwin-x86_64.txt @@ -17,6 +17,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC cpp-actors-core ydb-core-base ydb-core-protos + kqp-common-events core-kqp-executer_actor api-protos ) diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt index fb65e2fbd0..fea0e3d0a0 100644 --- a/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/run_script_actor/CMakeLists.linux-aarch64.txt @@ -18,6 +18,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC cpp-actors-core ydb-core-base ydb-core-protos + kqp-common-events core-kqp-executer_actor api-protos ) diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt index fb65e2fbd0..fea0e3d0a0 100644 --- a/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/run_script_actor/CMakeLists.linux-x86_64.txt @@ -18,6 +18,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC cpp-actors-core ydb-core-base ydb-core-protos + kqp-common-events core-kqp-executer_actor api-protos ) diff --git a/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt index 0359b1c93e..a612f9027d 100644 --- a/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/run_script_actor/CMakeLists.windows-x86_64.txt @@ -17,6 +17,7 @@ target_link_libraries(core-kqp-run_script_actor PUBLIC cpp-actors-core ydb-core-base ydb-core-protos + kqp-common-events core-kqp-executer_actor api-protos ) diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index f046094109..d440065241 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -1,6 +1,7 @@ #include "kqp_run_script_actor.h" #include <ydb/core/base/kikimr_issue.h> +#include <ydb/core/kqp/common/events/events.h> #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/executer_actor/kqp_executer.h> #include <ydb/core/kqp/proxy_service/kqp_script_executions.h> @@ -14,6 +15,8 @@ #include <util/generic/size_literals.h> +#include <forward_list> + #define LOG_T(stream) LOG_TRACE_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream); #define LOG_D(stream) LOG_DEBUG_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream); #define LOG_I(stream) LOG_INFO_S(NActors::TActivationContext::AsActorContext(), NKikimrServices::KQP_EXECUTER, SelfId() << " " << stream); @@ -28,6 +31,13 @@ constexpr ui64 RESULT_SIZE_LIMIT = 10_MB; constexpr int RESULT_ROWS_LIMIT = 1000; class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> { + enum class ERunState { + Created, + Running, + Cancelling, + Cancelled, + Finished, + }; public: TRunScriptActor(const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration) : Request(request) @@ -45,34 +55,89 @@ private: STRICT_STFUNC(StateFunc, hFunc(NActors::TEvents::TEvWakeup, Handle); hFunc(NActors::TEvents::TEvPoison, Handle); - hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); - hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); - hFunc(NKqp::TEvKqp::TEvFetchScriptResultsRequest, Handle); + hFunc(TEvKqpExecuter::TEvStreamData, Handle); + hFunc(TEvKqp::TEvQueryResponse, Handle); + hFunc(TEvKqp::TEvFetchScriptResultsRequest, Handle); + hFunc(TEvKqp::TEvCreateSessionResponse, Handle); + IgnoreFunc(TEvKqp::TEvCloseSessionResponse); + hFunc(TEvKqp::TEvCancelQueryResponse, Handle); + hFunc(TEvKqp::TEvCancelScriptExecutionRequest, Handle); + hFunc(TEvScriptExecutionFinished, Handle); ) + void SendToKqpProxy(THolder<NActors::IEventBase> ev) { + if (!Send(MakeKqpProxyID(SelfId().NodeId()), ev.Release())) { + Issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error")); + Finish(Ydb::StatusIds::INTERNAL_ERROR); + } + } + + void CreateSession() { + auto ev = MakeHolder<TEvKqp::TEvCreateSessionRequest>(); + ev->Record.MutableRequest()->SetDatabase(Request.GetRequest().GetDatabase()); + + SendToKqpProxy(std::move(ev)); + } + + void Handle(TEvKqp::TEvCreateSessionResponse::TPtr& ev) { + const auto& resp = ev->Get()->Record; + if (resp.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + if (resp.GetResourceExhausted()) { + Finish(Ydb::StatusIds::OVERLOADED); + } else { + Finish(Ydb::StatusIds::INTERNAL_ERROR); + } + } else { + SessionId = resp.GetResponse().GetSessionId(); + + if (RunState == ERunState::Running) { + Start(); + } else { + CloseSession(); + } + } + } + + void CloseSession() { + if (SessionId) { + auto ev = MakeHolder<TEvKqp::TEvCloseSessionRequest>(); + ev->Record.MutableRequest()->SetSessionId(SessionId); + + Send(MakeKqpProxyID(SelfId().NodeId()), ev.Release()); + SessionId.clear(); + } + } + void Start() { - auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + auto ev = MakeHolder<TEvKqp::TEvQueryRequest>(); ev->Record = Request; + ev->Record.MutableRequest()->SetSessionId(SessionId); NActors::ActorIdToProto(SelfId(), ev->Record.MutableRequestActorId()); - if (!Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release())) { - Issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error")); - Finish(Ydb::StatusIds::INTERNAL_ERROR); - } + SendToKqpProxy(std::move(ev)); } // TODO: remove this after there will be a normal way to store results and generate execution id void Handle(NActors::TEvents::TEvWakeup::TPtr&) { - Start(); + if (RunState == ERunState::Created) { + RunState = ERunState::Running; + CreateSession(); + } } + // Event in case of error in registering script in database + // Just pass away, because we have not started execution. void Handle(NActors::TEvents::TEvPoison::TPtr&) { + Y_VERIFY(RunState == ERunState::Created); PassAway(); } - void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) { - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + void Handle(TEvKqpExecuter::TEvStreamData::TPtr& ev) { + if (RunState != ERunState::Running) { + return; + } + auto resp = MakeHolder<TEvKqpExecuter::TEvStreamDataAck>(); resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(RESULT_SIZE_LIMIT); @@ -87,7 +152,10 @@ private: } } - void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { + void Handle(TEvKqp::TEvQueryResponse::TPtr& ev) { + if (RunState != ERunState::Running) { + return; + } auto& record = ev->Get()->Record.GetRef(); const auto& issueMessage = record.GetResponse().GetQueryIssues(); @@ -96,11 +164,16 @@ private: Finish(record.GetYdbStatus()); } - void Handle(NKqp::TEvKqp::TEvFetchScriptResultsRequest::TPtr& ev) { - auto resp = MakeHolder<NKqp::TEvKqp::TEvFetchScriptResultsResponse>(); + void Handle(TEvKqp::TEvFetchScriptResultsRequest::TPtr& ev) { + auto resp = MakeHolder<TEvKqp::TEvFetchScriptResultsResponse>(); if (!IsFinished()) { - resp->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST); - resp->Record.AddIssues()->set_message("Results are not ready"); + if (RunState == ERunState::Created || RunState == ERunState::Running) { + resp->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST); + resp->Record.AddIssues()->set_message("Results are not ready"); + } else if (RunState == ERunState::Cancelled || RunState == ERunState::Cancelling) { + resp->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST); + resp->Record.AddIssues()->set_message("Script execution is cancelled"); + } } else { if (!ResultSets.empty()) { resp->Record.SetResultSetIndex(0); @@ -126,7 +199,55 @@ private: Send(ev->Sender, std::move(resp)); } - void MergeResultSet(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) { + void Handle(TEvKqp::TEvCancelScriptExecutionRequest::TPtr& ev) { + switch (RunState) { + case ERunState::Created: + CancelRequests.emplace_front(std::move(ev)); + Finish(Ydb::StatusIds::CANCELLED, ERunState::Cancelling); + break; + case ERunState::Running: + CancelRequests.emplace_front(std::move(ev)); + CancelRunningQuery(); + break; + case ERunState::Cancelling: + CancelRequests.emplace_front(std::move(ev)); + break; + case ERunState::Cancelled: + Send(ev->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(Ydb::StatusIds::PRECONDITION_FAILED, "Already cancelled")); + break; + case ERunState::Finished: + Send(ev->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(Ydb::StatusIds::PRECONDITION_FAILED, "Already finished")); + break; + } + } + + void CancelRunningQuery() { + if (SessionId.empty()) { + Finish(Ydb::StatusIds::CANCELLED, ERunState::Cancelling); + } else { + RunState = ERunState::Cancelling; + auto ev = MakeHolder<TEvKqp::TEvCancelQueryRequest>(); + ev->Record.MutableRequest()->SetSessionId(SessionId); + + Send(MakeKqpProxyID(SelfId().NodeId()), ev.Release()); + } + } + + void Handle(TEvKqp::TEvCancelQueryResponse::TPtr&) { + Finish(Ydb::StatusIds::CANCELLED, ERunState::Cancelling); + } + + void Handle(TEvScriptExecutionFinished::TPtr& ev) { + if (RunState == ERunState::Cancelling) { + RunState = ERunState::Cancelled; + for (auto& req : CancelRequests) { + Send(req->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(ev->Get()->Status, ev->Get()->Issues)); + } + CancelRequests.clear(); + } + } + + void MergeResultSet(TEvKqpExecuter::TEvStreamData::TPtr& ev) { if (ResultSets.empty()) { ResultSets.emplace_back(ev->Get()->Record.GetResultSet()); return; @@ -153,18 +274,31 @@ private: static Ydb::Query::ExecStatus GetExecStatusFromStatusCode(Ydb::StatusIds::StatusCode status) { if (status == Ydb::StatusIds::SUCCESS) { return Ydb::Query::EXEC_STATUS_COMPLETED; + } else if (status == Ydb::StatusIds::CANCELLED) { + return Ydb::Query::EXEC_STATUS_CANCELLED; } else { return Ydb::Query::EXEC_STATUS_FAILED; } } - void Finish(Ydb::StatusIds::StatusCode status) { + void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finished) { + RunState = runState; Status = status; Register(CreateScriptExecutionFinisher(ActorIdToScriptExecutionId(SelfId()), Database, LeaseGeneration, status, GetExecStatusFromStatusCode(status), Issues)); + if (RunState == ERunState::Cancelling) { + Issues.AddIssue("Script execution is cancelled"); + ResultSets.clear(); + } + CloseSession(); + } + + void PassAway() override { + CloseSession(); + NActors::TActorBootstrapped<TRunScriptActor>::PassAway(); } bool IsFinished() const { - return Status != Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; + return RunState == ERunState::Finished; } bool IsTruncated() const { @@ -175,6 +309,9 @@ private: const NKikimrKqp::TEvQueryRequest Request; const TString Database; const ui64 LeaseGeneration; + TString SessionId; + ERunState RunState = ERunState::Created; + std::forward_list<TEvKqp::TEvCancelScriptExecutionRequest::TPtr> CancelRequests; // Result NYql::TIssues Issues; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index e698bde798..83b35a1ba7 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1749,6 +1749,21 @@ public: Cleanup(IsFatalError(ydbStatus)); } + void Handle(TEvKqp::TEvCancelQueryRequest::TPtr& ev) { + { + auto abort = MakeHolder<NYql::NDq::TEvDq::TEvAbortExecution>(); + abort->Record.SetStatusCode(NYql::NDqProto::StatusIds::CANCELLED); + abort->Record.AddIssues()->set_message("Canceled"); + Send(SelfId(), abort.Release()); + } + + { + auto resp = MakeHolder<TEvKqp::TEvCancelQueryResponse>(); + resp->Record.SetStatus(Ydb::StatusIds::SUCCESS); + Send(ev->Sender, resp.Release(), 0, ev->Cookie); + } + } + STFUNC(ReadyState) { try { switch (ev->GetTypeRewrite()) { @@ -1757,6 +1772,7 @@ public: hFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); + hFunc(TEvKqp::TEvCancelQueryRequest, Handle); // forgotten messages from previous aborted request hFunc(TEvKqp::TEvCompileResponse, HandleNoop); @@ -1785,6 +1801,7 @@ public: hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(NGRpcService::TEvClientLost, HandleClientLost); + hFunc(TEvKqp::TEvCancelQueryRequest, Handle); // forgotten messages from previous aborted request hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop); @@ -1815,6 +1832,7 @@ public: hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(NGRpcService::TEvClientLost, HandleClientLost); + hFunc(TEvKqp::TEvCancelQueryRequest, Handle); // forgotten messages from previous aborted request hFunc(TEvKqp::TEvCompileResponse, Handle); @@ -1844,6 +1862,7 @@ public: hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); hFunc(TEvKqp::TEvContinueShutdown, Handle); hFunc(NGRpcService::TEvClientLost, HandleNoop); + hFunc(TEvKqp::TEvCancelQueryRequest, HandleNoop); // forgotten messages from previous aborted request hFunc(TEvKqp::TEvCompileResponse, HandleNoop); diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index cf65f59613..e73b0b6017 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -302,6 +302,40 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_VALUES_EQUAL(listed, ScriptExecutionsCount); UNIT_ASSERT_EQUAL(ops, listedOps); } + + Y_UNIT_TEST(CancelScriptExecution) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto scriptExecutionOperation = db.ExecuteScript(R"( + SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key; + SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key; + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NOperation::TOperationClient opClient(kikimr.GetDriver()); + std::vector<NYdb::TAsyncStatus> cancelFutures(3); + // Check races also + for (auto& f : cancelFutures) { + f = opClient.Cancel(scriptExecutionOperation.Id()); + } + + for (auto& f : cancelFutures) { + auto cancelStatus = f.ExtractValueSync(); + UNIT_ASSERT_C(cancelStatus.GetStatus() == NYdb::EStatus::SUCCESS || cancelStatus.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED, cancelStatus.GetIssues().ToString()); + } + + auto op = opClient.Get<NYdb::NQuery::TScriptExecutionOperation>(scriptExecutionOperation.Id()).ExtractValueSync(); + UNIT_ASSERT(op.Ready()); + UNIT_ASSERT(op.Metadata().ExecStatus == EExecStatus::Completed || op.Metadata().ExecStatus == EExecStatus::Canceled); + UNIT_ASSERT_EQUAL(op.Metadata().ExecutionId, scriptExecutionOperation.Metadata().ExecutionId); + UNIT_ASSERT(op.Status().GetStatus() == NYdb::EStatus::SUCCESS || op.Status().GetStatus() == NYdb::EStatus::CANCELLED); + + // Check cancel for completed query + auto cancelStatus = opClient.Cancel(scriptExecutionOperation.Id()).ExtractValueSync(); + UNIT_ASSERT_C(cancelStatus.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED, cancelStatus.GetIssues().ToString()); + } } } // namespace NKqp diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 2c155c661f..6b29828fec 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -345,6 +345,20 @@ message TEvDataQueryStreamPart { repeated NKikimrMiniKQL.TResult Results = 2; }; +message TCancelQueryRequest { + optional bytes SessionId = 1; +}; + +message TEvCancelQueryRequest { + optional TCancelQueryRequest Request = 1; + optional string TraceId = 2; +}; + +message TEvCancelQueryResponse { + optional Ydb.StatusIds.StatusCode Status = 1; + repeated Ydb.Issue.IssueMessage Issues = 2; +} + // Executer message TExecuterTxRequest { @@ -612,3 +626,12 @@ message TEvFetchScriptResultsResponse { optional uint64 ResultSetIndex = 3; optional Ydb.ResultSet ResultSet = 4; } + +// Request that is sent to run script actor to cancel execution and write finish status to database. +message TEvCancelScriptExecutionRequest { +} + +message TEvCancelScriptExecutionResponse { + optional Ydb.StatusIds.StatusCode Status = 1; + repeated Ydb.Issue.IssueMessage Issues = 2; +} |