diff options
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions.cpp | 182 |
1 files changed, 35 insertions, 147 deletions
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index e33bb540b0..4c54a4177d 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -1256,153 +1256,13 @@ private: NYql::TIssues Issues; }; -class TGetScriptExecutionOperationBase : public TScriptExecutionFinisherBase { +class TGetScriptExecutionOperationActor : public TScriptExecutionFinisherBase { public: - explicit TGetScriptExecutionOperationBase(TEvGetScriptExecutionOperation::TPtr ev, Ydb::Query::ExecuteScriptMetadata&& metadata = Ydb::Query::ExecuteScriptMetadata(), NYql::TIssues&& issues = {}) + explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev) : Request(std::move(ev)) - , Metadata(metadata) - , Issues(issues) , StartActorTime(TInstant::Now()) {} - bool CheakLeaseDeadline(NYdb::TResultSetParser& deadlineResult, TDuration operationTtl, TDuration resultsTtl) { - deadlineResult.TryNextRow(); - - TMaybe<TInstant> leaseDeadline = deadlineResult.ColumnParser(0).GetOptionalTimestamp(); - if (!leaseDeadline) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected operation state"); - return false; - } - - if (*leaseDeadline < StartActorTime) { - LeaseExpired = true; - Ready = true; - - FinishScriptExecution(Request->Get()->Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl, Issues); - SetQueryResultHandler(&TGetScriptExecutionOperationBase::OnFinishOperation); - return false; - } - - return true; - } - - void OnFinishOperation() { - Issues = LeaseExpiredIssues(); - Metadata.set_exec_status(Ydb::Query::EXEC_STATUS_ABORTED); - Finish(Ydb::StatusIds::SUCCESS, std::move(Issues)); - } - - void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) { - if (status == Ydb::StatusIds::SUCCESS) { - TMaybe<google::protobuf::Any> metadata; - metadata.ConstructInPlace().PackFrom(Metadata); - - Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(Ready, status, std::move(Issues), std::move(metadata))); - } else { - Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(false, status, std::move(issues), Nothing())); - } - } - -protected: - TEvGetScriptExecutionOperation::TPtr Request; - Ydb::Query::ExecuteScriptMetadata Metadata; - NYql::TIssues Issues; - TInstant StartActorTime; - bool LeaseExpired = false; - bool Ready = false; -}; - -class TGetScriptExecutionOperationAfterTliActor : public TGetScriptExecutionOperationBase { -public: - TGetScriptExecutionOperationAfterTliActor(TEvGetScriptExecutionOperation::TPtr ev, Ydb::Query::ExecuteScriptMetadata&& metadata, NYql::TIssues&& issues) - : TGetScriptExecutionOperationBase(std::move(ev), std::move(metadata), std::move(issues)) - {} - - void OnRunQuery() override { - TString sql = R"( - -- TGetScriptExecutionOperationAfterTliActor::OnRunQuery - DECLARE $database AS Text; - DECLARE $execution_id AS Text; - - SELECT execution_status, meta - FROM `.metadata/script_executions` - WHERE database = $database AND execution_id = $execution_id; - - SELECT lease_deadline - FROM `.metadata/script_execution_leases` - WHERE database = $database AND execution_id = $execution_id; - )"; - - NYdb::TParamsBuilder params; - params - .AddParam("$database") - .Utf8(Request->Get()->Database) - .Build() - .AddParam("$execution_id") - .Utf8(Metadata.execution_id()) - .Build(); - - RunDataQuery(sql, ¶ms, TTxControl::BeginTx()); - SetQueryResultHandler(&TGetScriptExecutionOperationAfterTliActor::OnGetInfo); - } - - void OnGetInfo() { - if (ResultSets.size() != 2) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); - return; - } - NYdb::TResultSetParser result(ResultSets[0]); - if (result.RowsCount() == 0) { - Finish(Ydb::StatusIds::NOT_FOUND, "No such execution"); - return; - } - - result.TryNextRow(); - - const TMaybe<i32> executionStatus = result.ColumnParser("execution_status").GetOptionalInt32(); - if (executionStatus) { - Metadata.set_exec_status(static_cast<Ydb::Query::ExecStatus>(*executionStatus)); - } - - auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument(); - if (!serializedMeta) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation metainformation"); - return; - } - const auto ttl = GetTtlFromSerializedMeta(*serializedMeta); - if (!ttl) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); - return; - } - - const auto [operationTtl, resultsTtl] = *ttl; - - NYdb::TResultSetParser deadlineResult(ResultSets[1]); - if (deadlineResult.RowsCount() == 0) { - Ready = true; - } else if (!CheakLeaseDeadline(deadlineResult, operationTtl, resultsTtl)) { - return; - } - - CommitTransaction(); - } - - void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { - if (status == Ydb::StatusIds::ABORTED && LeaseExpired) { - Metadata.set_exec_status(Ydb::Query::EXEC_STATUS_ABORTED); - status = Ydb::StatusIds::SUCCESS; - } - - Reply(status, std::move(issues)); - } -}; - -class TGetScriptExecutionOperationActor : public TGetScriptExecutionOperationBase { -public: - explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev) - : TGetScriptExecutionOperationBase(std::move(ev)) - {} - void OnRunQuery() override { TString sql = R"( -- TGetScriptExecutionOperationActor::OnRunQuery @@ -1546,25 +1406,53 @@ public: } const auto [operationTtl, resultsTtl] = *ttl; - if (!CheakLeaseDeadline(deadlineResult, operationTtl, resultsTtl)) { + deadlineResult.TryNextRow(); + + TMaybe<TInstant> leaseDeadline = deadlineResult.ColumnParser(0).GetOptionalTimestamp(); + if (!leaseDeadline) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected operation state"); return; } + if (*leaseDeadline < StartActorTime) { + LeaseExpired = true; + FinishScriptExecution(Request->Get()->Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl, Issues); + SetQueryResultHandler(&TGetScriptExecutionOperationActor::OnFinishOperation); + } } - CommitTransaction(); + if (!LeaseExpired) { + CommitTransaction(); + } + } + + void OnFinishOperation() { + Ready = true; + Issues = LeaseExpiredIssues(); + Metadata.set_exec_status(Ydb::Query::EXEC_STATUS_ABORTED); + + Finish(Ydb::StatusIds::SUCCESS, std::move(Issues)); } void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { - if (status == Ydb::StatusIds::ABORTED && LeaseExpired) { - Register(new TGetScriptExecutionOperationAfterTliActor(std::move(Request), std::move(Metadata), std::move(Issues))); + if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::ABORTED && LeaseExpired) { + TMaybe<google::protobuf::Any> metadata; + metadata.ConstructInPlace().PackFrom(Metadata); + + Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(Ready, Ydb::StatusIds::SUCCESS, std::move(Issues), std::move(metadata))); } else { - Reply(status, std::move(issues)); + Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(false, status, std::move(issues), Nothing())); } } private: + TEvGetScriptExecutionOperation::TPtr Request; + TInstant StartActorTime; TString ExecutionId; + bool Ready = false; + bool LeaseExpired = false; + NYql::TIssues Issues; + Ydb::Query::ExecuteScriptMetadata Metadata; }; class TListScriptExecutionOperationsQuery : public TQueryBase { |