aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp182
1 files changed, 35 insertions, 147 deletions
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index e33bb540b0..4c54a4177d 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -1256,153 +1256,13 @@ private:
NYql::TIssues Issues;
};
-class TGetScriptExecutionOperationBase : public TScriptExecutionFinisherBase {
+class TGetScriptExecutionOperationActor : public TScriptExecutionFinisherBase {
public:
- explicit TGetScriptExecutionOperationBase(TEvGetScriptExecutionOperation::TPtr ev, Ydb::Query::ExecuteScriptMetadata&& metadata = Ydb::Query::ExecuteScriptMetadata(), NYql::TIssues&& issues = {})
+ explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev)
: Request(std::move(ev))
- , Metadata(metadata)
- , Issues(issues)
, StartActorTime(TInstant::Now())
{}
- bool CheakLeaseDeadline(NYdb::TResultSetParser& deadlineResult, TDuration operationTtl, TDuration resultsTtl) {
- deadlineResult.TryNextRow();
-
- TMaybe<TInstant> leaseDeadline = deadlineResult.ColumnParser(0).GetOptionalTimestamp();
- if (!leaseDeadline) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected operation state");
- return false;
- }
-
- if (*leaseDeadline < StartActorTime) {
- LeaseExpired = true;
- Ready = true;
-
- FinishScriptExecution(Request->Get()->Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl, Issues);
- SetQueryResultHandler(&TGetScriptExecutionOperationBase::OnFinishOperation);
- return false;
- }
-
- return true;
- }
-
- void OnFinishOperation() {
- Issues = LeaseExpiredIssues();
- Metadata.set_exec_status(Ydb::Query::EXEC_STATUS_ABORTED);
- Finish(Ydb::StatusIds::SUCCESS, std::move(Issues));
- }
-
- void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
- if (status == Ydb::StatusIds::SUCCESS) {
- TMaybe<google::protobuf::Any> metadata;
- metadata.ConstructInPlace().PackFrom(Metadata);
-
- Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(Ready, status, std::move(Issues), std::move(metadata)));
- } else {
- Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(false, status, std::move(issues), Nothing()));
- }
- }
-
-protected:
- TEvGetScriptExecutionOperation::TPtr Request;
- Ydb::Query::ExecuteScriptMetadata Metadata;
- NYql::TIssues Issues;
- TInstant StartActorTime;
- bool LeaseExpired = false;
- bool Ready = false;
-};
-
-class TGetScriptExecutionOperationAfterTliActor : public TGetScriptExecutionOperationBase {
-public:
- TGetScriptExecutionOperationAfterTliActor(TEvGetScriptExecutionOperation::TPtr ev, Ydb::Query::ExecuteScriptMetadata&& metadata, NYql::TIssues&& issues)
- : TGetScriptExecutionOperationBase(std::move(ev), std::move(metadata), std::move(issues))
- {}
-
- void OnRunQuery() override {
- TString sql = R"(
- -- TGetScriptExecutionOperationAfterTliActor::OnRunQuery
- DECLARE $database AS Text;
- DECLARE $execution_id AS Text;
-
- SELECT execution_status, meta
- FROM `.metadata/script_executions`
- WHERE database = $database AND execution_id = $execution_id;
-
- SELECT lease_deadline
- FROM `.metadata/script_execution_leases`
- WHERE database = $database AND execution_id = $execution_id;
- )";
-
- NYdb::TParamsBuilder params;
- params
- .AddParam("$database")
- .Utf8(Request->Get()->Database)
- .Build()
- .AddParam("$execution_id")
- .Utf8(Metadata.execution_id())
- .Build();
-
- RunDataQuery(sql, &params, TTxControl::BeginTx());
- SetQueryResultHandler(&TGetScriptExecutionOperationAfterTliActor::OnGetInfo);
- }
-
- void OnGetInfo() {
- if (ResultSets.size() != 2) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
- return;
- }
- NYdb::TResultSetParser result(ResultSets[0]);
- if (result.RowsCount() == 0) {
- Finish(Ydb::StatusIds::NOT_FOUND, "No such execution");
- return;
- }
-
- result.TryNextRow();
-
- const TMaybe<i32> executionStatus = result.ColumnParser("execution_status").GetOptionalInt32();
- if (executionStatus) {
- Metadata.set_exec_status(static_cast<Ydb::Query::ExecStatus>(*executionStatus));
- }
-
- auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument();
- if (!serializedMeta) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation metainformation");
- return;
- }
- const auto ttl = GetTtlFromSerializedMeta(*serializedMeta);
- if (!ttl) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted");
- return;
- }
-
- const auto [operationTtl, resultsTtl] = *ttl;
-
- NYdb::TResultSetParser deadlineResult(ResultSets[1]);
- if (deadlineResult.RowsCount() == 0) {
- Ready = true;
- } else if (!CheakLeaseDeadline(deadlineResult, operationTtl, resultsTtl)) {
- return;
- }
-
- CommitTransaction();
- }
-
- void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
- if (status == Ydb::StatusIds::ABORTED && LeaseExpired) {
- Metadata.set_exec_status(Ydb::Query::EXEC_STATUS_ABORTED);
- status = Ydb::StatusIds::SUCCESS;
- }
-
- Reply(status, std::move(issues));
- }
-};
-
-class TGetScriptExecutionOperationActor : public TGetScriptExecutionOperationBase {
-public:
- explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev)
- : TGetScriptExecutionOperationBase(std::move(ev))
- {}
-
void OnRunQuery() override {
TString sql = R"(
-- TGetScriptExecutionOperationActor::OnRunQuery
@@ -1546,25 +1406,53 @@ public:
}
const auto [operationTtl, resultsTtl] = *ttl;
- if (!CheakLeaseDeadline(deadlineResult, operationTtl, resultsTtl)) {
+ deadlineResult.TryNextRow();
+
+ TMaybe<TInstant> leaseDeadline = deadlineResult.ColumnParser(0).GetOptionalTimestamp();
+ if (!leaseDeadline) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected operation state");
return;
}
+ if (*leaseDeadline < StartActorTime) {
+ LeaseExpired = true;
+ FinishScriptExecution(Request->Get()->Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl, Issues);
+ SetQueryResultHandler(&TGetScriptExecutionOperationActor::OnFinishOperation);
+ }
}
- CommitTransaction();
+ if (!LeaseExpired) {
+ CommitTransaction();
+ }
+ }
+
+ void OnFinishOperation() {
+ Ready = true;
+ Issues = LeaseExpiredIssues();
+ Metadata.set_exec_status(Ydb::Query::EXEC_STATUS_ABORTED);
+
+ Finish(Ydb::StatusIds::SUCCESS, std::move(Issues));
}
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
- if (status == Ydb::StatusIds::ABORTED && LeaseExpired) {
- Register(new TGetScriptExecutionOperationAfterTliActor(std::move(Request), std::move(Metadata), std::move(Issues)));
+ if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::ABORTED && LeaseExpired) {
+ TMaybe<google::protobuf::Any> metadata;
+ metadata.ConstructInPlace().PackFrom(Metadata);
+
+ Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(Ready, Ydb::StatusIds::SUCCESS, std::move(Issues), std::move(metadata)));
} else {
- Reply(status, std::move(issues));
+ Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(false, status, std::move(issues), Nothing()));
}
}
private:
+ TEvGetScriptExecutionOperation::TPtr Request;
+ TInstant StartActorTime;
TString ExecutionId;
+ bool Ready = false;
+ bool LeaseExpired = false;
+ NYql::TIssues Issues;
+ Ydb::Query::ExecuteScriptMetadata Metadata;
};
class TListScriptExecutionOperationsQuery : public TQueryBase {