diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-06-09 15:06:01 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-06-09 15:06:01 +0300 |
commit | 771b2d98e7a5045e038d42923b65879135603368 (patch) | |
tree | 8028d2ec43207b4eb7a5c19bb3a2602f46cfb90b | |
parent | be521d3ff04fca2a5fc67baad12a6e9b1da6b773 (diff) | |
download | ydb-771b2d98e7a5045e038d42923b65879135603368.tar.gz |
Properly process dead leases for script execution operations
-rw-r--r-- | ydb/core/grpc_services/query/rpc_fetch_script_results.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions.cpp | 361 | ||||
-rw-r--r-- | ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp | 37 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_query_service_ut.cpp | 3 | ||||
-rw-r--r-- | ydb/library/query_actor/query_actor.cpp | 45 | ||||
-rw-r--r-- | ydb/library/query_actor/query_actor.h | 25 | ||||
-rw-r--r-- | ydb/library/query_actor/query_actor_ut.cpp | 22 |
7 files changed, 384 insertions, 113 deletions
diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp index 68247a6153..3563c371f4 100644 --- a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp +++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp @@ -97,12 +97,12 @@ private: if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) { Reply(Ydb::StatusIds::NOT_FOUND, "No such execution"); } else { - Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination"); + Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver fetch request to destination"); } } void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) { - Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination"); + Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver fetch request to destination"); } void PassAway() override { diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index a0a3677d37..4a7cbaff4d 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -99,16 +99,26 @@ struct TEvPrivate { }; struct TEvLeaseCheckResult : public NActors::TEventLocal<TEvLeaseCheckResult, EvLeaseCheckResult> { - TEvLeaseCheckResult(Ydb::StatusIds::StatusCode statusCode, NYql::TIssues&& issues, TMaybe<Ydb::StatusIds::StatusCode> operationStatus) + TEvLeaseCheckResult(Ydb::StatusIds::StatusCode statusCode, NYql::TIssues&& issues) : Status(statusCode) , Issues(std::move(issues)) - , OperationStatus(operationStatus) { } + TEvLeaseCheckResult(TMaybe<Ydb::StatusIds::StatusCode> operationStatus, + TMaybe<Ydb::Query::ExecStatus> executionStatus, + TMaybe<NYql::TIssues> operationIssues) + : Status(Ydb::StatusIds::SUCCESS) + , OperationStatus(operationStatus) + , ExecutionStatus(executionStatus) + , OperationIssues(operationIssues) + {} + const Ydb::StatusIds::StatusCode Status; const NYql::TIssues Issues; const TMaybe<Ydb::StatusIds::StatusCode> OperationStatus; + const TMaybe<Ydb::Query::ExecStatus> ExecutionStatus; + const TMaybe<NYql::TIssues> OperationIssues; }; }; @@ -567,7 +577,7 @@ class TScriptExecutionFinisherBase : public TQueryBase { public: using TQueryBase::TQueryBase; - void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, const NYql::TIssues& issues, TTxControl txControl = TTxControl::ContinueAndCommitTx()) { + void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, const NYql::TIssues& issues = LeaseExpiredIssues(), TTxControl txControl = TTxControl::ContinueAndCommitTx()) { TString sql = R"( DECLARE $database AS Text; DECLARE $execution_id AS Text; @@ -609,9 +619,17 @@ public: } void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, const TString& message, TTxControl txControl = TTxControl::ContinueAndCommitTx()) { + FinishScriptExecution(database, executionId, operationStatus, execStatus, IssuesFromMessage(message), txControl); + } + + static NYql::TIssues IssuesFromMessage(const TString& message) { NYql::TIssues issues; issues.AddIssue(message); - FinishScriptExecution(database, executionId, operationStatus, execStatus, issues, txControl); + return issues; + } + + static NYql::TIssues LeaseExpiredIssues() { + return IssuesFromMessage("Lease expired"); } }; @@ -702,7 +720,117 @@ private: bool FinishWasRun = false; }; -class TGetScriptExecutionOperationActor : public TQueryBase { +class TCheckLeaseStatusActor : public TScriptExecutionFinisherBase { +public: + TCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease = Ydb::StatusIds::ABORTED, ui64 cookie = 0) + : Database(database) + , ExecutionId(executionId) + , StatusOnExpiredLease(statusOnExpiredLease) + , Cookie(cookie) + {} + + void OnRunQuery() override { + const TString sql = R"( + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + + SELECT operation_status, execution_status, issues FROM `.metadata/script_executions` + WHERE database = $database AND execution_id = $execution_id; + + SELECT lease_deadline FROM `.metadata/script_execution_leases` + WHERE database = $database AND execution_id = $execution_id; + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$database") + .Utf8(Database) + .Build() + .AddParam("$execution_id") + .Utf8(ExecutionId) + .Build(); + + RunDataQuery(sql, ¶ms, TTxControl::BeginTx()); + SetQueryResultHandler(&TCheckLeaseStatusActor::OnResult); + } + + void OnResult() { + if (ResultSets.size() != 2) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); + return; + } + NYdb::TResultSetParser result(ResultSets[0]); + if (result.RowsCount() == 0) { + Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution"); + return; + } + + result.TryNextRow(); + + TMaybe<i32> operationStatus = result.ColumnParser("operation_status").GetOptionalInt32(); + TMaybe<TInstant> leaseDeadline; + + NYdb::TResultSetParser result2(ResultSets[1]); + + if (result2.RowsCount() > 0) { + result2.TryNextRow(); + + leaseDeadline = result2.ColumnParser(0).GetOptionalTimestamp(); + } + + if (leaseDeadline) { + if (operationStatus) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state"); + } else if (*leaseDeadline < RunStartTime) { + FinishScriptExecution(Database, ExecutionId, StatusOnExpiredLease, Ydb::Query::EXEC_STATUS_ABORTED); + SetQueryResultHandler(&TCheckLeaseStatusActor::OnFinishScriptExecution); + } else { + // OperationStatus is Nothing(): currently running + CommitTransaction(); + } + } else if (operationStatus) { + OperationStatus = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus); + TMaybe<i32> executionStatus = result.ColumnParser("execution_status").GetOptionalInt32(); + if (executionStatus) { + ExecutionStatus = static_cast<Ydb::Query::ExecStatus>(*executionStatus); + } + const TMaybe<TString> issuesSerialized = result.ColumnParser("issues").GetOptionalJsonDocument(); + if (issuesSerialized) { + OperationIssues = DeserializeIssues(*issuesSerialized); + } + CommitTransaction(); + } else { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state"); + } + } + + void OnFinishScriptExecution() { + OperationStatus = StatusOnExpiredLease; + ExecutionStatus = Ydb::Query::EXEC_STATUS_ABORTED; + OperationIssues = LeaseExpiredIssues(); + Finish(); + } + + void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { + if (status == Ydb::StatusIds::SUCCESS) { + Send(Owner, new TEvPrivate::TEvLeaseCheckResult(OperationStatus, ExecutionStatus, std::move(OperationIssues)), 0, Cookie); + } else { + Send(Owner, new TEvPrivate::TEvLeaseCheckResult(status, std::move(issues)), 0, Cookie); + } + } + +private: + const TInstant RunStartTime = TInstant::Now(); + const TString Database; + const TString ExecutionId; + const Ydb::StatusIds::StatusCode StatusOnExpiredLease; + const ui64 Cookie; + TMaybe<Ydb::StatusIds::StatusCode> OperationStatus; + TMaybe<Ydb::Query::ExecStatus> ExecutionStatus; + TMaybe<NYql::TIssues> OperationIssues; +}; + +class TGetScriptExecutionOperationActor : public TScriptExecutionFinisherBase { public: explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev) : Request(std::move(ev)) @@ -723,10 +851,16 @@ public: issues FROM `.metadata/script_executions` WHERE database = $database AND execution_id = $execution_id; + + SELECT + lease_deadline + FROM `.metadata/script_execution_leases` + WHERE database = $database AND execution_id = $execution_id; )"; TMaybe<TString> maybeExecutionId = ScriptExecutionFromOperation(Request->Get()->OperationId); Y_ENSURE(maybeExecutionId, "No execution id specified"); + ExecutionId = *maybeExecutionId; NYdb::TParamsBuilder params; params @@ -734,14 +868,15 @@ public: .Utf8(Request->Get()->Database) .Build() .AddParam("$execution_id") - .Utf8(*maybeExecutionId) + .Utf8(ExecutionId) .Build(); - RunDataQuery(sql, ¶ms); + RunDataQuery(sql, ¶ms, TTxControl::BeginTx()); + SetQueryResultHandler(&TGetScriptExecutionOperationActor::OnGetInfo); } - void OnQueryResult() override { - if (ResultSets.size() != 1) { + void OnGetInfo() { + if (ResultSets.size() != 2) { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); return; } @@ -783,31 +918,72 @@ public: } const TMaybe<TString> issuesSerialized = result.ColumnParser("issues").GetOptionalJsonDocument(); - NYql::TIssues issues; if (issuesSerialized) { - issues = DeserializeIssues(*issuesSerialized); + Issues = DeserializeIssues(*issuesSerialized); + } + + bool finishing = false; + if (!operationStatus) { + // Check lease deadline + NYdb::TResultSetParser deadlineResult(ResultSets[1]); + if (deadlineResult.RowsCount() == 0) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected operation state"); + return; + } + + deadlineResult.TryNextRow(); + + TMaybe<TInstant> leaseDeadline = deadlineResult.ColumnParser(0).GetOptionalTimestamp(); + if (!leaseDeadline) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected operation state"); + return; + } + + if (*leaseDeadline < TInstant::Now()) { + finishing = true; + + metadata.set_exec_status(Ydb::Query::EXEC_STATUS_ABORTED); + Ready = true; + Issues = LeaseExpiredIssues(); + + FinishScriptExecution(Request->Get()->Database, ExecutionId, Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, Issues); + SetQueryResultHandler(&TGetScriptExecutionOperationActor::OnFinishOperation); + } } Metadata.ConstructInPlace().PackFrom(metadata); - Finish(Ydb::StatusIds::SUCCESS, std::move(issues)); + if (!finishing) { + CommitTransaction(); + } + } + + void OnFinishOperation() { + Finish(Ydb::StatusIds::SUCCESS, std::move(Issues)); } void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { - Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(Ready, status, std::move(issues), std::move(Metadata))); + if (status == Ydb::StatusIds::SUCCESS) { + Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(Ready, status, std::move(Issues), std::move(Metadata))); + } else { + Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(false, status, std::move(issues), Nothing())); + } } private: TEvGetScriptExecutionOperation::TPtr Request; + TString ExecutionId; bool Ready = false; + NYql::TIssues Issues; TMaybe<google::protobuf::Any> Metadata; }; -class TListScriptExecutionOperationsActor : public TQueryBase { +class TListScriptExecutionOperationsQuery : public TQueryBase { public: - TListScriptExecutionOperationsActor(TEvListScriptExecutionOperations::TPtr ev) - : Request(std::move(ev)) - , PageSize(ClampVal<ui64>(Request->Get()->PageSize, 1, 500)) + TListScriptExecutionOperationsQuery(const TString& database, const TString& pageToken, ui64 pageSize) + : Database(database) + , PageToken(pageToken) + , PageSize(pageSize) {} static std::pair<TInstant, TString> ParsePageToken(const TString& token) { @@ -825,7 +1001,7 @@ public: void OnRunQuery() override { TStringBuilder sql; - if (Request->Get()->PageToken) { + if (PageToken) { sql << R"( DECLARE $execution_id AS Text; DECLARE $ts AS Timestamp; @@ -847,7 +1023,7 @@ public: FROM `.metadata/script_executions` WHERE database = $database )"; - if (Request->Get()->PageToken) { + if (PageToken) { sql << R"( AND (start_ts, execution_id) <= ($ts, $execution_id) )"; @@ -860,14 +1036,14 @@ public: NYdb::TParamsBuilder params; params .AddParam("$database") - .Utf8(Request->Get()->Database) + .Utf8(Database) .Build() .AddParam("$page_size") .Uint64(PageSize + 1) .Build(); - if (Request->Get()->PageToken) { - auto pageTokenParts = ParsePageToken(Request->Get()->PageToken); + if (PageToken) { + auto pageTokenParts = ParsePageToken(PageToken); params .AddParam("$ts") .Timestamp(pageTokenParts.first) @@ -955,106 +1131,95 @@ public: } void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { - Send(Request->Sender, new TEvListScriptExecutionOperationsResponse(status, std::move(issues), NextPageToken, std::move(Operations))); + Send(Owner, new TEvListScriptExecutionOperationsResponse(status, std::move(issues), NextPageToken, std::move(Operations))); } private: - TEvListScriptExecutionOperations::TPtr Request; - ui64 PageSize = 0; + const TString Database; + const TString PageToken; + const ui64 PageSize; TString NextPageToken; std::vector<Ydb::Operations::Operation> Operations; }; -class TCheckLeaseStatusActor : public TScriptExecutionFinisherBase { +class TListScriptExecutionOperationsActor : public TActorBootstrapped<TListScriptExecutionOperationsActor> { public: - TCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease = Ydb::StatusIds::ABORTED) - : Database(database) - , ExecutionId(executionId) - , StatusOnExpiredLease(statusOnExpiredLease) + TListScriptExecutionOperationsActor(TEvListScriptExecutionOperations::TPtr ev) + : Request(std::move(ev)) {} - void OnRunQuery() override { - const TString sql = R"( - DECLARE $database AS Text; - DECLARE $execution_id AS Text; - - SELECT operation_status FROM `.metadata/script_executions` - WHERE database = $database AND execution_id = $execution_id; - - SELECT lease_deadline FROM `.metadata/script_execution_leases` - WHERE database = $database AND execution_id = $execution_id; - )"; - - NYdb::TParamsBuilder params; - params - .AddParam("$database") - .Utf8(Database) - .Build() - .AddParam("$execution_id") - .Utf8(ExecutionId) - .Build(); + void Bootstrap() { + const ui64 pageSize = ClampVal<ui64>(Request->Get()->PageSize, 1, 100); + Register(new TListScriptExecutionOperationsQuery(Request->Get()->Database, Request->Get()->PageToken, pageSize)); - RunDataQuery(sql, ¶ms, TTxControl::BeginTx()); + Become(&TListScriptExecutionOperationsActor::StateFunc); } - void OnQueryResult() override { - if (!FinishWasRun) { - if (ResultSets.size() != 2) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); - return; - } - NYdb::TResultSetParser result(ResultSets[0]); - if (result.RowsCount() == 0) { - Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution"); - return; - } - - result.TryNextRow(); + STRICT_STFUNC(StateFunc, + hFunc(TEvListScriptExecutionOperationsResponse, Handle); + hFunc(TEvPrivate::TEvLeaseCheckResult, Handle); + ) - TMaybe<i32> operationStatus = result.ColumnParser(0).GetOptionalInt32(); - TMaybe<TInstant> leaseDeadline; + void Handle(TEvListScriptExecutionOperationsResponse::TPtr& ev) { + Response = std::move(ev); - NYdb::TResultSetParser result2(ResultSets[1]); + for (ui64 i = 0; i < Response->Get()->Operations.size(); ++i) { + const Ydb::Operations::Operation& op = Response->Get()->Operations[i]; + if (!op.ready()) { + Ydb::Query::ExecuteScriptMetadata metadata; + op.metadata().UnpackTo(&metadata); + Register(new TCheckLeaseStatusActor(Request->Get()->Database, metadata.execution_id(), Ydb::StatusIds::ABORTED, i)); + ++OperationsToCheck; + } + } - if (result2.RowsCount() > 0) { - result2.TryNextRow(); + if (OperationsToCheck == 0) { + Reply(); + } + } - leaseDeadline = result2.ColumnParser(0).GetOptionalTimestamp(); - } + void Handle(TEvPrivate::TEvLeaseCheckResult::TPtr& ev) { + Y_VERIFY(ev->Cookie < Response->Get()->Operations.size()); + + if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) { + Response->Get()->Status = ev->Get()->Status; + Response->Get()->Issues = std::move(ev->Get()->Issues); + Response->Get()->NextPageToken.clear(); + Response->Get()->Operations.clear(); + Reply(); + return; + } - if (leaseDeadline) { - if (operationStatus) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state"); - } else if (*leaseDeadline < RunStartTime) { - FinishWasRun = true; - FinishScriptExecution(Database, ExecutionId, StatusOnExpiredLease, Ydb::Query::EXEC_STATUS_ABORTED, "Lease expired"); - } else { - // OperationStatus is Nothing(): currently running - Finish(); + if (ev->Get()->OperationStatus) { + Ydb::Operations::Operation& op = Response->Get()->Operations[ev->Cookie]; + op.set_status(*ev->Get()->OperationStatus); + Ydb::Query::ExecuteScriptMetadata metadata; + op.metadata().UnpackTo(&metadata); + Y_VERIFY(ev->Get()->ExecutionStatus); + metadata.set_exec_status(*ev->Get()->ExecutionStatus); + op.mutable_metadata()->PackFrom(metadata); + if (ev->Get()->OperationIssues) { + for (const NYql::TIssue& issue : *ev->Get()->OperationIssues) { + NYql::IssueToMessage(issue, op.add_issues()); } - } else if (operationStatus) { - OperationStatus = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus); - Finish(); - } else { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state"); } - } else { - OperationStatus = StatusOnExpiredLease; - Finish(); + } + + --OperationsToCheck; + if (OperationsToCheck == 0) { + Reply(); } } - void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { - Send(Owner, new TEvPrivate::TEvLeaseCheckResult(status, std::move(issues), OperationStatus)); + void Reply() { + Send(Request->Sender, Response->Release().Release()); + PassAway(); } private: - const TInstant RunStartTime = TInstant::Now(); - const TString Database; - const TString ExecutionId; - const Ydb::StatusIds::StatusCode StatusOnExpiredLease; - TMaybe<Ydb::StatusIds::StatusCode> OperationStatus; - bool FinishWasRun = false; + TEvListScriptExecutionOperations::TPtr Request; + TEvListScriptExecutionOperationsResponse::TPtr Response; + ui64 OperationsToCheck = 0; }; class TCancelScriptExecutionOperationActor : public NActors::TActorBootstrapped<TCancelScriptExecutionOperationActor> { @@ -1121,12 +1286,12 @@ public: if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) { // The actor probably had finished before our cancel message arrived. Register(new TCheckLeaseStatusActor(Request->Get()->Database, ExecutionId)); // Check if the operation has finished. } else { - Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination"); + Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver cancel request to destination"); } } void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) { - Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination"); + Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver cancel request to destination"); } void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index d440065241..0b2f808a22 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -36,6 +36,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> { Running, Cancelling, Cancelled, + Finishing, Finished, }; public: @@ -147,7 +148,7 @@ private: Send(ev->Sender, resp.Release()); - if (!IsFinished() && !IsTruncated()) { + if (IsExecuting() && !IsTruncated()) { MergeResultSet(ev); } } @@ -166,7 +167,7 @@ private: void Handle(TEvKqp::TEvFetchScriptResultsRequest::TPtr& ev) { auto resp = MakeHolder<TEvKqp::TEvFetchScriptResultsResponse>(); - if (!IsFinished()) { + if (IsExecuting()) { if (RunState == ERunState::Created || RunState == ERunState::Running) { resp->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST); resp->Record.AddIssues()->set_message("Results are not ready"); @@ -215,6 +216,9 @@ private: case ERunState::Cancelled: Send(ev->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(Ydb::StatusIds::PRECONDITION_FAILED, "Already cancelled")); break; + case ERunState::Finishing: + CancelRequests.emplace_front(std::move(ev)); + break; case ERunState::Finished: Send(ev->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(Ydb::StatusIds::PRECONDITION_FAILED, "Already finished")); break; @@ -240,11 +244,23 @@ private: void Handle(TEvScriptExecutionFinished::TPtr& ev) { if (RunState == ERunState::Cancelling) { RunState = ERunState::Cancelled; - for (auto& req : CancelRequests) { - Send(req->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(ev->Get()->Status, ev->Get()->Issues)); - } - CancelRequests.clear(); } + + if (RunState == ERunState::Finishing) { + RunState = ERunState::Finished; + } + + Ydb::StatusIds::StatusCode status = ev->Get()->Status; + NYql::TIssues issues = std::move(ev->Get()->Issues); + if (RunState == ERunState::Finished) { + status = Ydb::StatusIds::PRECONDITION_FAILED; + issues.Clear(); + issues.AddIssue("Already finished"); + } + for (auto& req : CancelRequests) { + Send(req->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(status, issues)); + } + CancelRequests.clear(); } void MergeResultSet(TEvKqpExecuter::TEvStreamData::TPtr& ev) { @@ -281,7 +297,7 @@ private: } } - void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finished) { + void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finishing) { RunState = runState; Status = status; Register(CreateScriptExecutionFinisher(ActorIdToScriptExecutionId(SelfId()), Database, LeaseGeneration, status, GetExecStatusFromStatusCode(status), Issues)); @@ -297,8 +313,11 @@ private: NActors::TActorBootstrapped<TRunScriptActor>::PassAway(); } - bool IsFinished() const { - return RunState == ERunState::Finished; + bool IsExecuting() const { + return RunState != ERunState::Finished + && RunState != ERunState::Finishing + && RunState != ERunState::Cancelled + && RunState != ERunState::Cancelling; } bool IsTruncated() const { diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index e73b0b6017..75719c99f5 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -327,7 +327,8 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } auto op = opClient.Get<NYdb::NQuery::TScriptExecutionOperation>(scriptExecutionOperation.Id()).ExtractValueSync(); - UNIT_ASSERT(op.Ready()); + UNIT_ASSERT_C(op.Status().IsSuccess(), op.Status().GetIssues().ToString()); + UNIT_ASSERT_C(op.Ready(), op.Status().GetIssues().ToString()); UNIT_ASSERT(op.Metadata().ExecStatus == EExecStatus::Completed || op.Metadata().ExecStatus == EExecStatus::Canceled); UNIT_ASSERT_EQUAL(op.Metadata().ExecutionId, scriptExecutionOperation.Metadata().ExecutionId); UNIT_ASSERT(op.Status().GetStatus() == NYdb::EStatus::SUCCESS || op.Status().GetStatus() == NYdb::EStatus::CANCELLED); diff --git a/ydb/library/query_actor/query_actor.cpp b/ydb/library/query_actor/query_actor.cpp index af28f8ce94..36d26a4b5c 100644 --- a/ydb/library/query_actor/query_actor.cpp +++ b/ydb/library/query_actor/query_actor.cpp @@ -108,6 +108,17 @@ TQueryBase::TEvQueryBasePrivate::TEvRollbackTransactionResponse::TEvRollbackTran { } +TQueryBase::TEvQueryBasePrivate::TEvCommitTransactionResponse::TEvCommitTransactionResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) + : Status(status) + , Issues(std::move(issues)) +{ +} + +TQueryBase::TEvQueryBasePrivate::TEvCommitTransactionResponse::TEvCommitTransactionResponse(const Ydb::Table::CommitTransactionResponse& resp) + : TEvCommitTransactionResponse(resp.operation().status(), IssuesFromOperation(resp.operation())) +{ +} + TQueryBase::TQueryBase(ui64 logComponent, TString sessionId, TString database) : LogComponent(logComponent) , Database(std::move(database)) @@ -125,6 +136,7 @@ STRICT_STFUNC(TQueryBase::StateFunc, hFunc(TEvQueryBasePrivate::TEvCreateSessionResult, Handle); hFunc(TEvQueryBasePrivate::TEvDeleteSessionResult, Handle); hFunc(TEvQueryBasePrivate::TEvRollbackTransactionResponse, Handle); + hFunc(TEvQueryBasePrivate::TEvCommitTransactionResponse, Handle); ); void TQueryBase::Bootstrap() { @@ -172,11 +184,11 @@ void TQueryBase::Handle(TEvQueryBasePrivate::TEvDataQueryResult::TPtr& ev) { ResultSets.emplace_back(std::move(resultSet)); } try { - OnQueryResult(); + (this->*QueryResultHandler)(); } catch (const std::exception& ex) { Finish(Ydb::StatusIds::INTERNAL_ERROR, ex.what()); } - Y_VERIFY(Finished || RunningQuery); + Y_VERIFY(Finished || RunningQuery || RunningCommit); } else { Finish(ev->Get()->Status, std::move(ev->Get()->Issues)); } @@ -193,6 +205,18 @@ void TQueryBase::Handle(TEvQueryBasePrivate::TEvRollbackTransactionResponse::TPt } } +void TQueryBase::Handle(TEvQueryBasePrivate::TEvCommitTransactionResponse::TPtr& ev) { + LOG_D("CommitTransactionResult: " << ev->Get()->Status << ". Issues: " << ev->Get()->Issues.ToOneLineString()); + + OnFinish(ev->Get()->Status, std::move(ev->Get()->Issues)); + + if (DeleteSession) { + RunDeleteSession(); + } else { + PassAway(); + } +} + void TQueryBase::Finish(Ydb::StatusIds::StatusCode status, const TString& message, bool rollbackOnError) { NYql::TIssues issues; issues.AddIssue(message); @@ -278,4 +302,21 @@ void TQueryBase::RollbackTransaction() { Subscribe<Ydb::Table::RollbackTransactionResponse, TEvQueryBasePrivate::TEvRollbackTransactionResponse>(NRpcService::DoLocalRpc<TEvRollbackTransactionRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true)); } +void TQueryBase::CommitTransaction() { + RunningCommit = true; + Y_VERIFY(SessionId); + Y_VERIFY(TxId); + LOG_D("Commit transaction: " << TxId); + using TEvCommitTransactionRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::CommitTransactionRequest, + Ydb::Table::CommitTransactionResponse>; + Ydb::Table::CommitTransactionRequest req; + req.set_session_id(SessionId); + req.set_tx_id(TxId); + Subscribe<Ydb::Table::CommitTransactionResponse, TEvQueryBasePrivate::TEvCommitTransactionResponse>(NRpcService::DoLocalRpc<TEvCommitTransactionRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true)); +} + +void TQueryBase::CallOnQueryResult() { + OnQueryResult(); +} + } // namespace NKikimr diff --git a/ydb/library/query_actor/query_actor.h b/ydb/library/query_actor/query_actor.h index 10f69a492a..a176ddead2 100644 --- a/ydb/library/query_actor/query_actor.h +++ b/ydb/library/query_actor/query_actor.h @@ -31,6 +31,8 @@ protected: bool Continue = false; }; + using TQueryResultHandler = void (TQueryBase::*)(); + private: struct TEvQueryBasePrivate { // Event ids @@ -39,6 +41,7 @@ private: EvCreateSessionResult, EvDeleteSessionResult, EvRollbackTransactionResponse, + EvCommitTransactionResponse, EvEnd }; @@ -81,6 +84,14 @@ private: Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; }; + + struct TEvCommitTransactionResponse : public NActors::TEventLocal<TEvCommitTransactionResponse, EvCommitTransactionResponse> { + TEvCommitTransactionResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues); + TEvCommitTransactionResponse(const Ydb::Table::CommitTransactionResponse& resp); + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + }; }; public: @@ -99,11 +110,17 @@ protected: void Finish(); void RunDataQuery(const TString& sql, NYdb::TParamsBuilder* params = nullptr, TTxControl txControl = TTxControl::BeginAndCommitTx()); + void CommitTransaction(); + + template <class THandlerFunc> + void SetQueryResultHandler(THandlerFunc handler) { + QueryResultHandler = static_cast<TQueryResultHandler>(handler); + } private: // Methods for implementing in derived classes. virtual void OnRunQuery() = 0; - virtual void OnQueryResult() = 0; // Must either run next query or finish + virtual void OnQueryResult() {} // Must either run next query or finish virtual void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) = 0; private: @@ -124,12 +141,15 @@ private: void Handle(TEvQueryBasePrivate::TEvDeleteSessionResult::TPtr& ev); void Handle(TEvQueryBasePrivate::TEvDataQueryResult::TPtr& ev); void Handle(TEvQueryBasePrivate::TEvRollbackTransactionResponse::TPtr& ev); + void Handle(TEvQueryBasePrivate::TEvCommitTransactionResponse::TPtr& ev); void RunQuery(); void RunCreateSession(); void RunDeleteSession(); void RollbackTransaction(); + void CallOnQueryResult(); + protected: const ui64 LogComponent; const TString Database; @@ -137,9 +157,12 @@ protected: TString TxId; bool DeleteSession = false; bool RunningQuery = false; + bool RunningCommit = false; bool Finished = false; bool CommitRequested = false; + TQueryResultHandler QueryResultHandler = &TQueryBase::CallOnQueryResult; + NActors::TActorId Owner; std::vector<NYdb::TResultSet> ResultSets; diff --git a/ydb/library/query_actor/query_actor_ut.cpp b/ydb/library/query_actor/query_actor_ut.cpp index 131847cedb..b0b072c175 100644 --- a/ydb/library/query_actor/query_actor_ut.cpp +++ b/ydb/library/query_actor/query_actor_ut.cpp @@ -164,6 +164,28 @@ Y_UNIT_TEST_SUITE(QueryActorTest) { assertValues(); } + + Y_UNIT_TEST(Commit) { + TTestServer server; + + struct TSelectQuery : public TTestQueryActorBase { + void OnRunQuery() override { + RunDataQuery("SELECT * FROM TestTable", nullptr, TTxControl::BeginTx()); + SetQueryResultHandler(&TSelectQuery::MyResultHandler); + } + + void MyResultHandler() { + CommitTransaction(); // Finish will be after successful commit + } + + void OnQueryResult() override { + UNIT_ASSERT(false); + } + }; + + auto result = server.RunQueryActor<TSelectQuery>(); + UNIT_ASSERT_VALUES_EQUAL(result.StatusCode, Ydb::StatusIds::SUCCESS); + } } } // namespace NKikimr |