aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-06-09 15:06:01 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-06-09 15:06:01 +0300
commit771b2d98e7a5045e038d42923b65879135603368 (patch)
tree8028d2ec43207b4eb7a5c19bb3a2602f46cfb90b
parentbe521d3ff04fca2a5fc67baad12a6e9b1da6b773 (diff)
downloadydb-771b2d98e7a5045e038d42923b65879135603368.tar.gz
Properly process dead leases for script execution operations
-rw-r--r--ydb/core/grpc_services/query/rpc_fetch_script_results.cpp4
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp361
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp37
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp3
-rw-r--r--ydb/library/query_actor/query_actor.cpp45
-rw-r--r--ydb/library/query_actor/query_actor.h25
-rw-r--r--ydb/library/query_actor/query_actor_ut.cpp22
7 files changed, 384 insertions, 113 deletions
diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
index 68247a6153..3563c371f4 100644
--- a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
+++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
@@ -97,12 +97,12 @@ private:
if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) {
Reply(Ydb::StatusIds::NOT_FOUND, "No such execution");
} else {
- Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination");
+ Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver fetch request to destination");
}
}
void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) {
- Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination");
+ Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver fetch request to destination");
}
void PassAway() override {
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index a0a3677d37..4a7cbaff4d 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -99,16 +99,26 @@ struct TEvPrivate {
};
struct TEvLeaseCheckResult : public NActors::TEventLocal<TEvLeaseCheckResult, EvLeaseCheckResult> {
- TEvLeaseCheckResult(Ydb::StatusIds::StatusCode statusCode, NYql::TIssues&& issues, TMaybe<Ydb::StatusIds::StatusCode> operationStatus)
+ TEvLeaseCheckResult(Ydb::StatusIds::StatusCode statusCode, NYql::TIssues&& issues)
: Status(statusCode)
, Issues(std::move(issues))
- , OperationStatus(operationStatus)
{
}
+ TEvLeaseCheckResult(TMaybe<Ydb::StatusIds::StatusCode> operationStatus,
+ TMaybe<Ydb::Query::ExecStatus> executionStatus,
+ TMaybe<NYql::TIssues> operationIssues)
+ : Status(Ydb::StatusIds::SUCCESS)
+ , OperationStatus(operationStatus)
+ , ExecutionStatus(executionStatus)
+ , OperationIssues(operationIssues)
+ {}
+
const Ydb::StatusIds::StatusCode Status;
const NYql::TIssues Issues;
const TMaybe<Ydb::StatusIds::StatusCode> OperationStatus;
+ const TMaybe<Ydb::Query::ExecStatus> ExecutionStatus;
+ const TMaybe<NYql::TIssues> OperationIssues;
};
};
@@ -567,7 +577,7 @@ class TScriptExecutionFinisherBase : public TQueryBase {
public:
using TQueryBase::TQueryBase;
- void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, const NYql::TIssues& issues, TTxControl txControl = TTxControl::ContinueAndCommitTx()) {
+ void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, const NYql::TIssues& issues = LeaseExpiredIssues(), TTxControl txControl = TTxControl::ContinueAndCommitTx()) {
TString sql = R"(
DECLARE $database AS Text;
DECLARE $execution_id AS Text;
@@ -609,9 +619,17 @@ public:
}
void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, const TString& message, TTxControl txControl = TTxControl::ContinueAndCommitTx()) {
+ FinishScriptExecution(database, executionId, operationStatus, execStatus, IssuesFromMessage(message), txControl);
+ }
+
+ static NYql::TIssues IssuesFromMessage(const TString& message) {
NYql::TIssues issues;
issues.AddIssue(message);
- FinishScriptExecution(database, executionId, operationStatus, execStatus, issues, txControl);
+ return issues;
+ }
+
+ static NYql::TIssues LeaseExpiredIssues() {
+ return IssuesFromMessage("Lease expired");
}
};
@@ -702,7 +720,117 @@ private:
bool FinishWasRun = false;
};
-class TGetScriptExecutionOperationActor : public TQueryBase {
+class TCheckLeaseStatusActor : public TScriptExecutionFinisherBase {
+public:
+ TCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease = Ydb::StatusIds::ABORTED, ui64 cookie = 0)
+ : Database(database)
+ , ExecutionId(executionId)
+ , StatusOnExpiredLease(statusOnExpiredLease)
+ , Cookie(cookie)
+ {}
+
+ void OnRunQuery() override {
+ const TString sql = R"(
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+
+ SELECT operation_status, execution_status, issues FROM `.metadata/script_executions`
+ WHERE database = $database AND execution_id = $execution_id;
+
+ SELECT lease_deadline FROM `.metadata/script_execution_leases`
+ WHERE database = $database AND execution_id = $execution_id;
+ )";
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$database")
+ .Utf8(Database)
+ .Build()
+ .AddParam("$execution_id")
+ .Utf8(ExecutionId)
+ .Build();
+
+ RunDataQuery(sql, &params, TTxControl::BeginTx());
+ SetQueryResultHandler(&TCheckLeaseStatusActor::OnResult);
+ }
+
+ void OnResult() {
+ if (ResultSets.size() != 2) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
+ return;
+ }
+ NYdb::TResultSetParser result(ResultSets[0]);
+ if (result.RowsCount() == 0) {
+ Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution");
+ return;
+ }
+
+ result.TryNextRow();
+
+ TMaybe<i32> operationStatus = result.ColumnParser("operation_status").GetOptionalInt32();
+ TMaybe<TInstant> leaseDeadline;
+
+ NYdb::TResultSetParser result2(ResultSets[1]);
+
+ if (result2.RowsCount() > 0) {
+ result2.TryNextRow();
+
+ leaseDeadline = result2.ColumnParser(0).GetOptionalTimestamp();
+ }
+
+ if (leaseDeadline) {
+ if (operationStatus) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state");
+ } else if (*leaseDeadline < RunStartTime) {
+ FinishScriptExecution(Database, ExecutionId, StatusOnExpiredLease, Ydb::Query::EXEC_STATUS_ABORTED);
+ SetQueryResultHandler(&TCheckLeaseStatusActor::OnFinishScriptExecution);
+ } else {
+ // OperationStatus is Nothing(): currently running
+ CommitTransaction();
+ }
+ } else if (operationStatus) {
+ OperationStatus = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus);
+ TMaybe<i32> executionStatus = result.ColumnParser("execution_status").GetOptionalInt32();
+ if (executionStatus) {
+ ExecutionStatus = static_cast<Ydb::Query::ExecStatus>(*executionStatus);
+ }
+ const TMaybe<TString> issuesSerialized = result.ColumnParser("issues").GetOptionalJsonDocument();
+ if (issuesSerialized) {
+ OperationIssues = DeserializeIssues(*issuesSerialized);
+ }
+ CommitTransaction();
+ } else {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state");
+ }
+ }
+
+ void OnFinishScriptExecution() {
+ OperationStatus = StatusOnExpiredLease;
+ ExecutionStatus = Ydb::Query::EXEC_STATUS_ABORTED;
+ OperationIssues = LeaseExpiredIssues();
+ Finish();
+ }
+
+ void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
+ if (status == Ydb::StatusIds::SUCCESS) {
+ Send(Owner, new TEvPrivate::TEvLeaseCheckResult(OperationStatus, ExecutionStatus, std::move(OperationIssues)), 0, Cookie);
+ } else {
+ Send(Owner, new TEvPrivate::TEvLeaseCheckResult(status, std::move(issues)), 0, Cookie);
+ }
+ }
+
+private:
+ const TInstant RunStartTime = TInstant::Now();
+ const TString Database;
+ const TString ExecutionId;
+ const Ydb::StatusIds::StatusCode StatusOnExpiredLease;
+ const ui64 Cookie;
+ TMaybe<Ydb::StatusIds::StatusCode> OperationStatus;
+ TMaybe<Ydb::Query::ExecStatus> ExecutionStatus;
+ TMaybe<NYql::TIssues> OperationIssues;
+};
+
+class TGetScriptExecutionOperationActor : public TScriptExecutionFinisherBase {
public:
explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev)
: Request(std::move(ev))
@@ -723,10 +851,16 @@ public:
issues
FROM `.metadata/script_executions`
WHERE database = $database AND execution_id = $execution_id;
+
+ SELECT
+ lease_deadline
+ FROM `.metadata/script_execution_leases`
+ WHERE database = $database AND execution_id = $execution_id;
)";
TMaybe<TString> maybeExecutionId = ScriptExecutionFromOperation(Request->Get()->OperationId);
Y_ENSURE(maybeExecutionId, "No execution id specified");
+ ExecutionId = *maybeExecutionId;
NYdb::TParamsBuilder params;
params
@@ -734,14 +868,15 @@ public:
.Utf8(Request->Get()->Database)
.Build()
.AddParam("$execution_id")
- .Utf8(*maybeExecutionId)
+ .Utf8(ExecutionId)
.Build();
- RunDataQuery(sql, &params);
+ RunDataQuery(sql, &params, TTxControl::BeginTx());
+ SetQueryResultHandler(&TGetScriptExecutionOperationActor::OnGetInfo);
}
- void OnQueryResult() override {
- if (ResultSets.size() != 1) {
+ void OnGetInfo() {
+ if (ResultSets.size() != 2) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
return;
}
@@ -783,31 +918,72 @@ public:
}
const TMaybe<TString> issuesSerialized = result.ColumnParser("issues").GetOptionalJsonDocument();
- NYql::TIssues issues;
if (issuesSerialized) {
- issues = DeserializeIssues(*issuesSerialized);
+ Issues = DeserializeIssues(*issuesSerialized);
+ }
+
+ bool finishing = false;
+ if (!operationStatus) {
+ // Check lease deadline
+ NYdb::TResultSetParser deadlineResult(ResultSets[1]);
+ if (deadlineResult.RowsCount() == 0) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected operation state");
+ return;
+ }
+
+ deadlineResult.TryNextRow();
+
+ TMaybe<TInstant> leaseDeadline = deadlineResult.ColumnParser(0).GetOptionalTimestamp();
+ if (!leaseDeadline) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected operation state");
+ return;
+ }
+
+ if (*leaseDeadline < TInstant::Now()) {
+ finishing = true;
+
+ metadata.set_exec_status(Ydb::Query::EXEC_STATUS_ABORTED);
+ Ready = true;
+ Issues = LeaseExpiredIssues();
+
+ FinishScriptExecution(Request->Get()->Database, ExecutionId, Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, Issues);
+ SetQueryResultHandler(&TGetScriptExecutionOperationActor::OnFinishOperation);
+ }
}
Metadata.ConstructInPlace().PackFrom(metadata);
- Finish(Ydb::StatusIds::SUCCESS, std::move(issues));
+ if (!finishing) {
+ CommitTransaction();
+ }
+ }
+
+ void OnFinishOperation() {
+ Finish(Ydb::StatusIds::SUCCESS, std::move(Issues));
}
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
- Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(Ready, status, std::move(issues), std::move(Metadata)));
+ if (status == Ydb::StatusIds::SUCCESS) {
+ Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(Ready, status, std::move(Issues), std::move(Metadata)));
+ } else {
+ Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(false, status, std::move(issues), Nothing()));
+ }
}
private:
TEvGetScriptExecutionOperation::TPtr Request;
+ TString ExecutionId;
bool Ready = false;
+ NYql::TIssues Issues;
TMaybe<google::protobuf::Any> Metadata;
};
-class TListScriptExecutionOperationsActor : public TQueryBase {
+class TListScriptExecutionOperationsQuery : public TQueryBase {
public:
- TListScriptExecutionOperationsActor(TEvListScriptExecutionOperations::TPtr ev)
- : Request(std::move(ev))
- , PageSize(ClampVal<ui64>(Request->Get()->PageSize, 1, 500))
+ TListScriptExecutionOperationsQuery(const TString& database, const TString& pageToken, ui64 pageSize)
+ : Database(database)
+ , PageToken(pageToken)
+ , PageSize(pageSize)
{}
static std::pair<TInstant, TString> ParsePageToken(const TString& token) {
@@ -825,7 +1001,7 @@ public:
void OnRunQuery() override {
TStringBuilder sql;
- if (Request->Get()->PageToken) {
+ if (PageToken) {
sql << R"(
DECLARE $execution_id AS Text;
DECLARE $ts AS Timestamp;
@@ -847,7 +1023,7 @@ public:
FROM `.metadata/script_executions`
WHERE database = $database
)";
- if (Request->Get()->PageToken) {
+ if (PageToken) {
sql << R"(
AND (start_ts, execution_id) <= ($ts, $execution_id)
)";
@@ -860,14 +1036,14 @@ public:
NYdb::TParamsBuilder params;
params
.AddParam("$database")
- .Utf8(Request->Get()->Database)
+ .Utf8(Database)
.Build()
.AddParam("$page_size")
.Uint64(PageSize + 1)
.Build();
- if (Request->Get()->PageToken) {
- auto pageTokenParts = ParsePageToken(Request->Get()->PageToken);
+ if (PageToken) {
+ auto pageTokenParts = ParsePageToken(PageToken);
params
.AddParam("$ts")
.Timestamp(pageTokenParts.first)
@@ -955,106 +1131,95 @@ public:
}
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
- Send(Request->Sender, new TEvListScriptExecutionOperationsResponse(status, std::move(issues), NextPageToken, std::move(Operations)));
+ Send(Owner, new TEvListScriptExecutionOperationsResponse(status, std::move(issues), NextPageToken, std::move(Operations)));
}
private:
- TEvListScriptExecutionOperations::TPtr Request;
- ui64 PageSize = 0;
+ const TString Database;
+ const TString PageToken;
+ const ui64 PageSize;
TString NextPageToken;
std::vector<Ydb::Operations::Operation> Operations;
};
-class TCheckLeaseStatusActor : public TScriptExecutionFinisherBase {
+class TListScriptExecutionOperationsActor : public TActorBootstrapped<TListScriptExecutionOperationsActor> {
public:
- TCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease = Ydb::StatusIds::ABORTED)
- : Database(database)
- , ExecutionId(executionId)
- , StatusOnExpiredLease(statusOnExpiredLease)
+ TListScriptExecutionOperationsActor(TEvListScriptExecutionOperations::TPtr ev)
+ : Request(std::move(ev))
{}
- void OnRunQuery() override {
- const TString sql = R"(
- DECLARE $database AS Text;
- DECLARE $execution_id AS Text;
-
- SELECT operation_status FROM `.metadata/script_executions`
- WHERE database = $database AND execution_id = $execution_id;
-
- SELECT lease_deadline FROM `.metadata/script_execution_leases`
- WHERE database = $database AND execution_id = $execution_id;
- )";
-
- NYdb::TParamsBuilder params;
- params
- .AddParam("$database")
- .Utf8(Database)
- .Build()
- .AddParam("$execution_id")
- .Utf8(ExecutionId)
- .Build();
+ void Bootstrap() {
+ const ui64 pageSize = ClampVal<ui64>(Request->Get()->PageSize, 1, 100);
+ Register(new TListScriptExecutionOperationsQuery(Request->Get()->Database, Request->Get()->PageToken, pageSize));
- RunDataQuery(sql, &params, TTxControl::BeginTx());
+ Become(&TListScriptExecutionOperationsActor::StateFunc);
}
- void OnQueryResult() override {
- if (!FinishWasRun) {
- if (ResultSets.size() != 2) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
- return;
- }
- NYdb::TResultSetParser result(ResultSets[0]);
- if (result.RowsCount() == 0) {
- Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution");
- return;
- }
-
- result.TryNextRow();
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvListScriptExecutionOperationsResponse, Handle);
+ hFunc(TEvPrivate::TEvLeaseCheckResult, Handle);
+ )
- TMaybe<i32> operationStatus = result.ColumnParser(0).GetOptionalInt32();
- TMaybe<TInstant> leaseDeadline;
+ void Handle(TEvListScriptExecutionOperationsResponse::TPtr& ev) {
+ Response = std::move(ev);
- NYdb::TResultSetParser result2(ResultSets[1]);
+ for (ui64 i = 0; i < Response->Get()->Operations.size(); ++i) {
+ const Ydb::Operations::Operation& op = Response->Get()->Operations[i];
+ if (!op.ready()) {
+ Ydb::Query::ExecuteScriptMetadata metadata;
+ op.metadata().UnpackTo(&metadata);
+ Register(new TCheckLeaseStatusActor(Request->Get()->Database, metadata.execution_id(), Ydb::StatusIds::ABORTED, i));
+ ++OperationsToCheck;
+ }
+ }
- if (result2.RowsCount() > 0) {
- result2.TryNextRow();
+ if (OperationsToCheck == 0) {
+ Reply();
+ }
+ }
- leaseDeadline = result2.ColumnParser(0).GetOptionalTimestamp();
- }
+ void Handle(TEvPrivate::TEvLeaseCheckResult::TPtr& ev) {
+ Y_VERIFY(ev->Cookie < Response->Get()->Operations.size());
+
+ if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
+ Response->Get()->Status = ev->Get()->Status;
+ Response->Get()->Issues = std::move(ev->Get()->Issues);
+ Response->Get()->NextPageToken.clear();
+ Response->Get()->Operations.clear();
+ Reply();
+ return;
+ }
- if (leaseDeadline) {
- if (operationStatus) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state");
- } else if (*leaseDeadline < RunStartTime) {
- FinishWasRun = true;
- FinishScriptExecution(Database, ExecutionId, StatusOnExpiredLease, Ydb::Query::EXEC_STATUS_ABORTED, "Lease expired");
- } else {
- // OperationStatus is Nothing(): currently running
- Finish();
+ if (ev->Get()->OperationStatus) {
+ Ydb::Operations::Operation& op = Response->Get()->Operations[ev->Cookie];
+ op.set_status(*ev->Get()->OperationStatus);
+ Ydb::Query::ExecuteScriptMetadata metadata;
+ op.metadata().UnpackTo(&metadata);
+ Y_VERIFY(ev->Get()->ExecutionStatus);
+ metadata.set_exec_status(*ev->Get()->ExecutionStatus);
+ op.mutable_metadata()->PackFrom(metadata);
+ if (ev->Get()->OperationIssues) {
+ for (const NYql::TIssue& issue : *ev->Get()->OperationIssues) {
+ NYql::IssueToMessage(issue, op.add_issues());
}
- } else if (operationStatus) {
- OperationStatus = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus);
- Finish();
- } else {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state");
}
- } else {
- OperationStatus = StatusOnExpiredLease;
- Finish();
+ }
+
+ --OperationsToCheck;
+ if (OperationsToCheck == 0) {
+ Reply();
}
}
- void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
- Send(Owner, new TEvPrivate::TEvLeaseCheckResult(status, std::move(issues), OperationStatus));
+ void Reply() {
+ Send(Request->Sender, Response->Release().Release());
+ PassAway();
}
private:
- const TInstant RunStartTime = TInstant::Now();
- const TString Database;
- const TString ExecutionId;
- const Ydb::StatusIds::StatusCode StatusOnExpiredLease;
- TMaybe<Ydb::StatusIds::StatusCode> OperationStatus;
- bool FinishWasRun = false;
+ TEvListScriptExecutionOperations::TPtr Request;
+ TEvListScriptExecutionOperationsResponse::TPtr Response;
+ ui64 OperationsToCheck = 0;
};
class TCancelScriptExecutionOperationActor : public NActors::TActorBootstrapped<TCancelScriptExecutionOperationActor> {
@@ -1121,12 +1286,12 @@ public:
if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) { // The actor probably had finished before our cancel message arrived.
Register(new TCheckLeaseStatusActor(Request->Get()->Database, ExecutionId)); // Check if the operation has finished.
} else {
- Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination");
+ Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver cancel request to destination");
}
}
void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) {
- Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver request to destination");
+ Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver cancel request to destination");
}
void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
index d440065241..0b2f808a22 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
@@ -36,6 +36,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
Running,
Cancelling,
Cancelled,
+ Finishing,
Finished,
};
public:
@@ -147,7 +148,7 @@ private:
Send(ev->Sender, resp.Release());
- if (!IsFinished() && !IsTruncated()) {
+ if (IsExecuting() && !IsTruncated()) {
MergeResultSet(ev);
}
}
@@ -166,7 +167,7 @@ private:
void Handle(TEvKqp::TEvFetchScriptResultsRequest::TPtr& ev) {
auto resp = MakeHolder<TEvKqp::TEvFetchScriptResultsResponse>();
- if (!IsFinished()) {
+ if (IsExecuting()) {
if (RunState == ERunState::Created || RunState == ERunState::Running) {
resp->Record.SetStatus(Ydb::StatusIds::BAD_REQUEST);
resp->Record.AddIssues()->set_message("Results are not ready");
@@ -215,6 +216,9 @@ private:
case ERunState::Cancelled:
Send(ev->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(Ydb::StatusIds::PRECONDITION_FAILED, "Already cancelled"));
break;
+ case ERunState::Finishing:
+ CancelRequests.emplace_front(std::move(ev));
+ break;
case ERunState::Finished:
Send(ev->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(Ydb::StatusIds::PRECONDITION_FAILED, "Already finished"));
break;
@@ -240,11 +244,23 @@ private:
void Handle(TEvScriptExecutionFinished::TPtr& ev) {
if (RunState == ERunState::Cancelling) {
RunState = ERunState::Cancelled;
- for (auto& req : CancelRequests) {
- Send(req->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(ev->Get()->Status, ev->Get()->Issues));
- }
- CancelRequests.clear();
}
+
+ if (RunState == ERunState::Finishing) {
+ RunState = ERunState::Finished;
+ }
+
+ Ydb::StatusIds::StatusCode status = ev->Get()->Status;
+ NYql::TIssues issues = std::move(ev->Get()->Issues);
+ if (RunState == ERunState::Finished) {
+ status = Ydb::StatusIds::PRECONDITION_FAILED;
+ issues.Clear();
+ issues.AddIssue("Already finished");
+ }
+ for (auto& req : CancelRequests) {
+ Send(req->Sender, new TEvKqp::TEvCancelScriptExecutionResponse(status, issues));
+ }
+ CancelRequests.clear();
}
void MergeResultSet(TEvKqpExecuter::TEvStreamData::TPtr& ev) {
@@ -281,7 +297,7 @@ private:
}
}
- void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finished) {
+ void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finishing) {
RunState = runState;
Status = status;
Register(CreateScriptExecutionFinisher(ActorIdToScriptExecutionId(SelfId()), Database, LeaseGeneration, status, GetExecStatusFromStatusCode(status), Issues));
@@ -297,8 +313,11 @@ private:
NActors::TActorBootstrapped<TRunScriptActor>::PassAway();
}
- bool IsFinished() const {
- return RunState == ERunState::Finished;
+ bool IsExecuting() const {
+ return RunState != ERunState::Finished
+ && RunState != ERunState::Finishing
+ && RunState != ERunState::Cancelled
+ && RunState != ERunState::Cancelling;
}
bool IsTruncated() const {
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index e73b0b6017..75719c99f5 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -327,7 +327,8 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
auto op = opClient.Get<NYdb::NQuery::TScriptExecutionOperation>(scriptExecutionOperation.Id()).ExtractValueSync();
- UNIT_ASSERT(op.Ready());
+ UNIT_ASSERT_C(op.Status().IsSuccess(), op.Status().GetIssues().ToString());
+ UNIT_ASSERT_C(op.Ready(), op.Status().GetIssues().ToString());
UNIT_ASSERT(op.Metadata().ExecStatus == EExecStatus::Completed || op.Metadata().ExecStatus == EExecStatus::Canceled);
UNIT_ASSERT_EQUAL(op.Metadata().ExecutionId, scriptExecutionOperation.Metadata().ExecutionId);
UNIT_ASSERT(op.Status().GetStatus() == NYdb::EStatus::SUCCESS || op.Status().GetStatus() == NYdb::EStatus::CANCELLED);
diff --git a/ydb/library/query_actor/query_actor.cpp b/ydb/library/query_actor/query_actor.cpp
index af28f8ce94..36d26a4b5c 100644
--- a/ydb/library/query_actor/query_actor.cpp
+++ b/ydb/library/query_actor/query_actor.cpp
@@ -108,6 +108,17 @@ TQueryBase::TEvQueryBasePrivate::TEvRollbackTransactionResponse::TEvRollbackTran
{
}
+TQueryBase::TEvQueryBasePrivate::TEvCommitTransactionResponse::TEvCommitTransactionResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues)
+ : Status(status)
+ , Issues(std::move(issues))
+{
+}
+
+TQueryBase::TEvQueryBasePrivate::TEvCommitTransactionResponse::TEvCommitTransactionResponse(const Ydb::Table::CommitTransactionResponse& resp)
+ : TEvCommitTransactionResponse(resp.operation().status(), IssuesFromOperation(resp.operation()))
+{
+}
+
TQueryBase::TQueryBase(ui64 logComponent, TString sessionId, TString database)
: LogComponent(logComponent)
, Database(std::move(database))
@@ -125,6 +136,7 @@ STRICT_STFUNC(TQueryBase::StateFunc,
hFunc(TEvQueryBasePrivate::TEvCreateSessionResult, Handle);
hFunc(TEvQueryBasePrivate::TEvDeleteSessionResult, Handle);
hFunc(TEvQueryBasePrivate::TEvRollbackTransactionResponse, Handle);
+ hFunc(TEvQueryBasePrivate::TEvCommitTransactionResponse, Handle);
);
void TQueryBase::Bootstrap() {
@@ -172,11 +184,11 @@ void TQueryBase::Handle(TEvQueryBasePrivate::TEvDataQueryResult::TPtr& ev) {
ResultSets.emplace_back(std::move(resultSet));
}
try {
- OnQueryResult();
+ (this->*QueryResultHandler)();
} catch (const std::exception& ex) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, ex.what());
}
- Y_VERIFY(Finished || RunningQuery);
+ Y_VERIFY(Finished || RunningQuery || RunningCommit);
} else {
Finish(ev->Get()->Status, std::move(ev->Get()->Issues));
}
@@ -193,6 +205,18 @@ void TQueryBase::Handle(TEvQueryBasePrivate::TEvRollbackTransactionResponse::TPt
}
}
+void TQueryBase::Handle(TEvQueryBasePrivate::TEvCommitTransactionResponse::TPtr& ev) {
+ LOG_D("CommitTransactionResult: " << ev->Get()->Status << ". Issues: " << ev->Get()->Issues.ToOneLineString());
+
+ OnFinish(ev->Get()->Status, std::move(ev->Get()->Issues));
+
+ if (DeleteSession) {
+ RunDeleteSession();
+ } else {
+ PassAway();
+ }
+}
+
void TQueryBase::Finish(Ydb::StatusIds::StatusCode status, const TString& message, bool rollbackOnError) {
NYql::TIssues issues;
issues.AddIssue(message);
@@ -278,4 +302,21 @@ void TQueryBase::RollbackTransaction() {
Subscribe<Ydb::Table::RollbackTransactionResponse, TEvQueryBasePrivate::TEvRollbackTransactionResponse>(NRpcService::DoLocalRpc<TEvRollbackTransactionRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true));
}
+void TQueryBase::CommitTransaction() {
+ RunningCommit = true;
+ Y_VERIFY(SessionId);
+ Y_VERIFY(TxId);
+ LOG_D("Commit transaction: " << TxId);
+ using TEvCommitTransactionRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::CommitTransactionRequest,
+ Ydb::Table::CommitTransactionResponse>;
+ Ydb::Table::CommitTransactionRequest req;
+ req.set_session_id(SessionId);
+ req.set_tx_id(TxId);
+ Subscribe<Ydb::Table::CommitTransactionResponse, TEvQueryBasePrivate::TEvCommitTransactionResponse>(NRpcService::DoLocalRpc<TEvCommitTransactionRequest>(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true));
+}
+
+void TQueryBase::CallOnQueryResult() {
+ OnQueryResult();
+}
+
} // namespace NKikimr
diff --git a/ydb/library/query_actor/query_actor.h b/ydb/library/query_actor/query_actor.h
index 10f69a492a..a176ddead2 100644
--- a/ydb/library/query_actor/query_actor.h
+++ b/ydb/library/query_actor/query_actor.h
@@ -31,6 +31,8 @@ protected:
bool Continue = false;
};
+ using TQueryResultHandler = void (TQueryBase::*)();
+
private:
struct TEvQueryBasePrivate {
// Event ids
@@ -39,6 +41,7 @@ private:
EvCreateSessionResult,
EvDeleteSessionResult,
EvRollbackTransactionResponse,
+ EvCommitTransactionResponse,
EvEnd
};
@@ -81,6 +84,14 @@ private:
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
};
+
+ struct TEvCommitTransactionResponse : public NActors::TEventLocal<TEvCommitTransactionResponse, EvCommitTransactionResponse> {
+ TEvCommitTransactionResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues);
+ TEvCommitTransactionResponse(const Ydb::Table::CommitTransactionResponse& resp);
+
+ Ydb::StatusIds::StatusCode Status;
+ NYql::TIssues Issues;
+ };
};
public:
@@ -99,11 +110,17 @@ protected:
void Finish();
void RunDataQuery(const TString& sql, NYdb::TParamsBuilder* params = nullptr, TTxControl txControl = TTxControl::BeginAndCommitTx());
+ void CommitTransaction();
+
+ template <class THandlerFunc>
+ void SetQueryResultHandler(THandlerFunc handler) {
+ QueryResultHandler = static_cast<TQueryResultHandler>(handler);
+ }
private:
// Methods for implementing in derived classes.
virtual void OnRunQuery() = 0;
- virtual void OnQueryResult() = 0; // Must either run next query or finish
+ virtual void OnQueryResult() {} // Must either run next query or finish
virtual void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) = 0;
private:
@@ -124,12 +141,15 @@ private:
void Handle(TEvQueryBasePrivate::TEvDeleteSessionResult::TPtr& ev);
void Handle(TEvQueryBasePrivate::TEvDataQueryResult::TPtr& ev);
void Handle(TEvQueryBasePrivate::TEvRollbackTransactionResponse::TPtr& ev);
+ void Handle(TEvQueryBasePrivate::TEvCommitTransactionResponse::TPtr& ev);
void RunQuery();
void RunCreateSession();
void RunDeleteSession();
void RollbackTransaction();
+ void CallOnQueryResult();
+
protected:
const ui64 LogComponent;
const TString Database;
@@ -137,9 +157,12 @@ protected:
TString TxId;
bool DeleteSession = false;
bool RunningQuery = false;
+ bool RunningCommit = false;
bool Finished = false;
bool CommitRequested = false;
+ TQueryResultHandler QueryResultHandler = &TQueryBase::CallOnQueryResult;
+
NActors::TActorId Owner;
std::vector<NYdb::TResultSet> ResultSets;
diff --git a/ydb/library/query_actor/query_actor_ut.cpp b/ydb/library/query_actor/query_actor_ut.cpp
index 131847cedb..b0b072c175 100644
--- a/ydb/library/query_actor/query_actor_ut.cpp
+++ b/ydb/library/query_actor/query_actor_ut.cpp
@@ -164,6 +164,28 @@ Y_UNIT_TEST_SUITE(QueryActorTest) {
assertValues();
}
+
+ Y_UNIT_TEST(Commit) {
+ TTestServer server;
+
+ struct TSelectQuery : public TTestQueryActorBase {
+ void OnRunQuery() override {
+ RunDataQuery("SELECT * FROM TestTable", nullptr, TTxControl::BeginTx());
+ SetQueryResultHandler(&TSelectQuery::MyResultHandler);
+ }
+
+ void MyResultHandler() {
+ CommitTransaction(); // Finish will be after successful commit
+ }
+
+ void OnQueryResult() override {
+ UNIT_ASSERT(false);
+ }
+ };
+
+ auto result = server.RunQueryActor<TSelectQuery>();
+ UNIT_ASSERT_VALUES_EQUAL(result.StatusCode, Ydb::StatusIds::SUCCESS);
+ }
}
} // namespace NKikimr