diff options
author | grigoriypisar <grigoriypisar@yandex-team.com> | 2023-08-25 19:14:10 +0300 |
---|---|---|
committer | grigoriypisar <grigoriypisar@yandex-team.com> | 2023-08-25 19:33:35 +0300 |
commit | 2a83f4fc1bdbc2452f24087cdeb2f68a35612507 (patch) | |
tree | 933460ec042dd0a0fdb389bf75b46227ecec407e | |
parent | f36aa51c663ece905630866ce7eff969024c7274 (diff) | |
download | ydb-2a83f4fc1bdbc2452f24087cdeb2f68a35612507.tar.gz |
Lease stabilisation
Temp changes
-rw-r--r-- | ydb/core/kqp/common/events/script_executions.h | 18 | ||||
-rw-r--r-- | ydb/core/kqp/common/simple/kqp_event_ids.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions.cpp | 143 | ||||
-rw-r--r-- | ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp | 29 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 7 |
6 files changed, 175 insertions, 36 deletions
diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h index f00d6294a5b..3627d9d6760 100644 --- a/ydb/core/kqp/common/events/script_executions.h +++ b/ydb/core/kqp/common/events/script_executions.h @@ -1,5 +1,6 @@ #pragma once #include <ydb/core/kqp/common/simple/kqp_event_ids.h> +#include <ydb/core/protos/kqp.pb.h> #include <ydb/library/yql/public/issue/yql_issue.h> #include <ydb/public/api/protos/ydb_operation.pb.h> #include <ydb/public/api/protos/ydb_status_codes.pb.h> @@ -47,8 +48,10 @@ struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScript }; struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvGetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvGetScriptExecutionOperationResponse> { - TEvGetScriptExecutionOperationResponse(bool ready, Ydb::StatusIds::StatusCode status, NYql::TIssues issues, TMaybe<google::protobuf::Any> metadata) + TEvGetScriptExecutionOperationResponse(bool ready, bool leaseExpired, TActorId runScriptActorId, Ydb::StatusIds::StatusCode status, NYql::TIssues issues, TMaybe<google::protobuf::Any> metadata) : Ready(ready) + , LeaseExpired(leaseExpired) + , RunScriptActorId(runScriptActorId) , Status(status) , Issues(std::move(issues)) , Metadata(std::move(metadata)) @@ -57,12 +60,15 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvG TEvGetScriptExecutionOperationResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) : Ready(false) + , LeaseExpired(false) , Status(status) , Issues(std::move(issues)) { } bool Ready; + bool LeaseExpired; + TActorId RunScriptActorId; Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; TMaybe<google::protobuf::Any> Metadata; @@ -102,18 +108,26 @@ struct TEvListScriptExecutionOperationsResponse : public NActors::TEventLocal<TE }; struct TEvScriptLeaseUpdateResponse : public NActors::TEventLocal<TEvScriptLeaseUpdateResponse, TKqpScriptExecutionEvents::EvScriptLeaseUpdateResponse> { - TEvScriptLeaseUpdateResponse(bool executionEntryExists, Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + TEvScriptLeaseUpdateResponse(bool executionEntryExists, TInstant currentDeadline, Ydb::StatusIds::StatusCode status, NYql::TIssues issues) : ExecutionEntryExists(executionEntryExists) + , CurrentDeadline(currentDeadline) , Status(status) , Issues(std::move(issues)) { } bool ExecutionEntryExists; + TInstant CurrentDeadline; Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; }; +struct TEvCheckAliveRequest : public NActors::TEventPB<TEvCheckAliveRequest, NKikimrKqp::TEvCheckAliveRequest, TKqpScriptExecutionEvents::EvCheckAliveRequest> { +}; + +struct TEvCheckAliveResponse : public NActors::TEventPB<TEvCheckAliveResponse, NKikimrKqp::TEvCheckAliveResponse, TKqpScriptExecutionEvents::EvCheckAliveResponse> { +}; + struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> { explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) : Database(database) diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index 83327d6d0d5..ea89513a22a 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -139,6 +139,8 @@ struct TKqpScriptExecutionEvents { EvForgetScriptExecutionOperationResponse, EvSaveScriptResultMetaFinished, EvSaveScriptResultFinished, + EvCheckAliveRequest, + EvCheckAliveResponse, }; }; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index b6e083b14bb..053556b2623 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -659,7 +659,7 @@ public: void Handle(TEvKqp::TEvScriptRequest::TPtr& ev) { if (CheckScriptExecutionsTablesReady<TEvKqp::TEvScriptResponse>(ev)) { - Register(CreateScriptExecutionCreatorActor(std::move(ev))); + Register(CreateScriptExecutionCreatorActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId); } } @@ -1464,7 +1464,7 @@ private: switch (ScriptExecutionsCreationStatus) { case EScriptExecutionsCreationStatus::NotStarted: ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Pending; - Register(CreateScriptExecutionsTablesCreator(MakeHolder<TEvPrivate::TEvScriptExecutionsTablesCreationFinished>())); + Register(CreateScriptExecutionsTablesCreator(MakeHolder<TEvPrivate::TEvScriptExecutionsTablesCreationFinished>()), TMailboxType::HTSwap, AppData()->SystemPoolId); [[fallthrough]]; case EScriptExecutionsCreationStatus::Pending: if (DelayedEventsQueue.size() < 10000) { @@ -1490,25 +1490,25 @@ private: void Handle(NKqp::TEvForgetScriptExecutionOperation::TPtr& ev) { if (CheckScriptExecutionsTablesReady<TEvForgetScriptExecutionOperationResponse>(ev)) { - Register(CreateForgetScriptExecutionOperationActor(std::move(ev))); + Register(CreateForgetScriptExecutionOperationActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId); } } void Handle(NKqp::TEvGetScriptExecutionOperation::TPtr& ev) { if (CheckScriptExecutionsTablesReady<TEvGetScriptExecutionOperationResponse>(ev)) { - Register(CreateGetScriptExecutionOperationActor(std::move(ev))); + Register(CreateGetScriptExecutionOperationActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId); } } void Handle(NKqp::TEvListScriptExecutionOperations::TPtr& ev) { if (CheckScriptExecutionsTablesReady<TEvListScriptExecutionOperationsResponse>(ev)) { - Register(CreateListScriptExecutionOperationsActor(std::move(ev))); + Register(CreateListScriptExecutionOperationsActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId); } } void Handle(NKqp::TEvCancelScriptExecutionOperation::TPtr& ev) { if (CheckScriptExecutionsTablesReady<TEvCancelScriptExecutionOperationResponse>(ev)) { - Register(CreateCancelScriptExecutionOperationActor(std::move(ev))); + Register(CreateCancelScriptExecutionOperationActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId); } } diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index 99a2dad4c15..d226a491d9d 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -403,14 +403,16 @@ public: return; } + LeaseDeadline = TInstant::Now() + LeaseDuration; + TString sql = R"( -- TScriptLeaseUpdater::OnGetLeaseInfo DECLARE $database AS Text; DECLARE $execution_id AS Text; - DECLARE $lease_deadline AS Timestamp; + DECLARE $lease_duration AS Interval; UPDATE `.metadata/script_execution_leases` - SET lease_deadline=$lease_deadline + SET lease_deadline=(CurrentUtcTimestamp() + $lease_duration) WHERE database = $database AND execution_id = $execution_id; )"; @@ -422,8 +424,8 @@ public: .AddParam("$execution_id") .Utf8(ExecutionId) .Build() - .AddParam("$lease_deadline") - .Timestamp(TInstant::Now() + LeaseDuration) + .AddParam("$lease_duration") + .Interval(static_cast<i64>(LeaseDuration.MicroSeconds())) .Build(); RunDataQuery(sql, ¶ms, TTxControl::ContinueAndCommitTx()); @@ -435,13 +437,14 @@ public: } void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { - Send(Owner, new TEvScriptLeaseUpdateResponse(LeaseExists, status, std::move(issues))); + Send(Owner, new TEvScriptLeaseUpdateResponse(LeaseExists, LeaseDeadline, status, std::move(issues))); } private: const TString Database; const TString ExecutionId; const TDuration LeaseDuration; + TInstant LeaseDeadline; bool LeaseExists = true; }; @@ -1022,10 +1025,12 @@ private: NYql::TIssues Issues; }; -class TGetScriptExecutionOperationActor : public TScriptExecutionFinisherBase { +class TGetScriptExecutionOperationQueryActor : public TScriptExecutionFinisherBase { public: - explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev) - : Request(std::move(ev)) + TGetScriptExecutionOperationQueryActor(const TString& database, const NOperationId::TOperationId& operationId, bool finishIfLeaseExpired) + : Database(database) + , OperationId(operationId) + , FinishIfLeaseExpired(finishIfLeaseExpired) , StartActorTime(TInstant::Now()) {} @@ -1036,6 +1041,7 @@ public: DECLARE $execution_id AS Text; SELECT + run_script_actor_id, operation_status, execution_status, query_text, @@ -1057,21 +1063,21 @@ public: WHERE database = $database AND execution_id = $execution_id; )"; - TMaybe<TString> maybeExecutionId = ScriptExecutionIdFromOperation(Request->Get()->OperationId); + TMaybe<TString> maybeExecutionId = ScriptExecutionIdFromOperation(OperationId); Y_ENSURE(maybeExecutionId, "No execution id specified"); ExecutionId = *maybeExecutionId; NYdb::TParamsBuilder params; params .AddParam("$database") - .Utf8(Request->Get()->Database) + .Utf8(Database) .Build() .AddParam("$execution_id") .Utf8(ExecutionId) .Build(); RunDataQuery(sql, ¶ms, TTxControl::BeginTx()); - SetQueryResultHandler(&TGetScriptExecutionOperationActor::OnGetInfo); + SetQueryResultHandler(&TGetScriptExecutionOperationQueryActor::OnGetInfo); } void OnGetInfo() { @@ -1092,7 +1098,7 @@ public: Ready = true; } - Metadata.set_execution_id(*ScriptExecutionIdFromOperation(Request->Get()->OperationId)); + Metadata.set_execution_id(*ScriptExecutionIdFromOperation(OperationId)); const TMaybe<i32> executionStatus = result.ColumnParser("execution_status").GetOptionalInt32(); if (executionStatus) { @@ -1151,6 +1157,11 @@ public: } } + const TMaybe<TString> runScriptActorIdString = result.ColumnParser("run_script_actor_id").GetOptionalUtf8(); + if (runScriptActorIdString) { + ScriptExecutionRunnerActorIdFromString(*runScriptActorIdString, RunScriptActorId); + } + if (!operationStatus) { // Check lease deadline NYdb::TResultSetParser deadlineResult(ResultSets[1]); @@ -1180,14 +1191,14 @@ public: return; } - if (*leaseDeadline < StartActorTime) { - LeaseExpired = true; - FinishScriptExecution(Request->Get()->Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl, Issues); - SetQueryResultHandler(&TGetScriptExecutionOperationActor::OnFinishOperation); + LeaseExpired = *leaseDeadline < StartActorTime; + if (LeaseExpired && FinishIfLeaseExpired) { + FinishScriptExecution(Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl, Issues); + SetQueryResultHandler(&TGetScriptExecutionOperationQueryActor::OnFinishOperation); } } - if (!LeaseExpired) { + if (!LeaseExpired || !FinishIfLeaseExpired) { CommitTransaction(); } } @@ -1205,22 +1216,114 @@ public: TMaybe<google::protobuf::Any> metadata; metadata.ConstructInPlace().PackFrom(Metadata); - Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(Ready, Ydb::StatusIds::SUCCESS, std::move(Issues), std::move(metadata))); + Send(Owner, new TEvGetScriptExecutionOperationResponse(Ready, LeaseExpired, RunScriptActorId, Ydb::StatusIds::SUCCESS, std::move(Issues), std::move(metadata))); } else { - Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(false, status, std::move(issues), Nothing())); + Send(Owner, new TEvGetScriptExecutionOperationResponse(false, LeaseExpired, RunScriptActorId, status, std::move(issues), Nothing())); } } private: - TEvGetScriptExecutionOperation::TPtr Request; + TString Database; + NOperationId::TOperationId OperationId; + bool FinishIfLeaseExpired; TInstant StartActorTime; TString ExecutionId; bool Ready = false; bool LeaseExpired = false; + TActorId RunScriptActorId; NYql::TIssues Issues; Ydb::Query::ExecuteScriptMetadata Metadata; }; +class TGetScriptExecutionOperationActor : public TActorBootstrapped<TGetScriptExecutionOperationActor> { + using TBase = TActorBootstrapped<TGetScriptExecutionOperationActor>; + + inline static const TDuration CHECK_ALIVE_REQUEST_TIMEOUT = TDuration::Seconds(60); + +public: + explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev) + : Request(std::move(ev)) + {} + + void Bootstrap() { + CreateGetScriptExecutionOperationQuery(false); + Become(&TGetScriptExecutionOperationActor::StateFunc); + } + +private: + STRICT_STFUNC(StateFunc, + hFunc(TEvGetScriptExecutionOperationResponse, Handle); + hFunc(TEvCheckAliveResponse, Handle); + hFunc(TEvents::TEvWakeup, Handle); + hFunc(NActors::TEvents::TEvUndelivered, Handle); + hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle); + ) + + void CreateGetScriptExecutionOperationQuery(bool finishIfLeaseExpired) { + Register(new TGetScriptExecutionOperationQueryActor(Request->Get()->Database, Request->Get()->OperationId, finishIfLeaseExpired)); + } + + void CreateFinishScriptExecutionOperationQuery() { + if (!WaitFinishQuery) { + WaitFinishQuery = true; + CreateGetScriptExecutionOperationQuery(true); + } + } + + void Handle(TEvGetScriptExecutionOperationResponse::TPtr& ev) { + Response = std::move(ev); + + if (WaitFinishQuery || !Response->Get()->LeaseExpired) { + Reply(); + return; + } + + Schedule(CHECK_ALIVE_REQUEST_TIMEOUT, new TEvents::TEvWakeup()); + + NActors::TActorId runScriptActor = Response->Get()->RunScriptActorId; + ui64 flags = IEventHandle::FlagTrackDelivery; + if (runScriptActor.NodeId() != SelfId().NodeId()) { + flags |= IEventHandle::FlagSubscribeOnSession; + SubscribedOnSession = runScriptActor.NodeId(); + } + Send(runScriptActor, new TEvCheckAliveRequest(), flags); + } + + void Handle(TEvCheckAliveResponse::TPtr&) { + Reply(); + } + + void Handle(TEvents::TEvWakeup::TPtr&) { + CreateFinishScriptExecutionOperationQuery(); + } + + void Handle(NActors::TEvents::TEvUndelivered::TPtr&) { + CreateFinishScriptExecutionOperationQuery(); + } + + void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) { + CreateFinishScriptExecutionOperationQuery(); + } + + void Reply() { + Send(Request->Sender, Response->Release().Release()); + PassAway(); + } + + void PassAway() override { + if (SubscribedOnSession) { + Send(TActivationContext::InterconnectProxy(*SubscribedOnSession), new TEvents::TEvUnsubscribe()); + } + TBase::PassAway(); + } + +private: + TEvGetScriptExecutionOperation::TPtr Request; + TEvGetScriptExecutionOperationResponse::TPtr Response; + bool WaitFinishQuery = false; + TMaybe<ui32> SubscribedOnSession; +}; + class TListScriptExecutionOperationsQuery : public TQueryBase { public: TListScriptExecutionOperationsQuery(const TString& database, const TString& pageToken, ui64 pageSize) diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index 669512139b0..29c8795c859 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -78,6 +78,7 @@ private: hFunc(TEvScriptLeaseUpdateResponse, Handle); hFunc(TEvSaveScriptResultMetaFinished, Handle); hFunc(TEvSaveScriptResultFinished, Handle); + hFunc(TEvCheckAliveRequest, Handle); ) void SendToKqpProxy(THolder<NActors::IEventBase> ev) { @@ -133,6 +134,15 @@ private: SendToKqpProxy(std::move(ev)); } + void Handle(TEvCheckAliveRequest::TPtr& ev) { + Send(ev->Sender, new TEvCheckAliveResponse()); + } + + void RunLeaseUpdater() { + Register(CreateScriptLeaseUpdateActor(SelfId(), Database, ExecutionId, LeaseDuration)); + LeaseUpdateQueryRunning = true; + } + // TODO: remove this after there will be a normal way to store results and generate execution id void Handle(NActors::TEvents::TEvWakeup::TPtr& ev) { switch (ev->Get()->Tag) { @@ -143,15 +153,9 @@ private: break; case EWakeUp::UpdateLeaseEvent: - if (RunState == ERunState::Cancelled || RunState == ERunState::Cancelling || RunState == ERunState::Finished || RunState == ERunState::Finishing) { - break; - } - - if (!LeaseUpdateQueryRunning && !FinalStatusIsSaved) { - Register(CreateScriptLeaseUpdateActor(SelfId(), Database, ExecutionId, LeaseDuration)); - LeaseUpdateQueryRunning = true; + if (IsExecuting() && !FinalStatusIsSaved) { + RunLeaseUpdater(); } - Schedule(LeaseDuration / LEASE_UPDATE_FREQUENCY, new NActors::TEvents::TEvWakeup(EWakeUp::UpdateLeaseEvent)); break; } } @@ -192,6 +196,15 @@ private: } else if (IsExecuting() && ev->Get()->Status != Ydb::StatusIds::SUCCESS) { Finish(ev->Get()->Status); } + + if (IsExecuting()) { + TInstant leaseUpdateTime = ev->Get()->CurrentDeadline - LeaseDuration / LEASE_UPDATE_FREQUENCY; + if (TInstant::Now() >= leaseUpdateTime) { + RunLeaseUpdater(); + } else { + Schedule(leaseUpdateTime, new NActors::TEvents::TEvWakeup(EWakeUp::UpdateLeaseEvent)); + } + } } // Event in case of error in registering script in database diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 835bc70a0cc..4fd27d332c5 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -676,6 +676,13 @@ message TEvCancelScriptExecutionResponse { repeated Ydb.Issue.IssueMessage Issues = 2; } +// Request that is sent to run script actor to check his existence. +message TEvCheckAliveRequest { +} + +message TEvCheckAliveResponse { +} + // stored in column "meta" of .metadata/script_executions table message TScriptExecutionOperationMeta { optional google.protobuf.Duration OperationTtl = 1; |