aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <grigoriypisar@yandex-team.com>2023-08-25 19:14:10 +0300
committergrigoriypisar <grigoriypisar@yandex-team.com>2023-08-25 19:33:35 +0300
commit2a83f4fc1bdbc2452f24087cdeb2f68a35612507 (patch)
tree933460ec042dd0a0fdb389bf75b46227ecec407e
parentf36aa51c663ece905630866ce7eff969024c7274 (diff)
downloadydb-2a83f4fc1bdbc2452f24087cdeb2f68a35612507.tar.gz
Lease stabilisation
Temp changes
-rw-r--r--ydb/core/kqp/common/events/script_executions.h18
-rw-r--r--ydb/core/kqp/common/simple/kqp_event_ids.h2
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp12
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp143
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp29
-rw-r--r--ydb/core/protos/kqp.proto7
6 files changed, 175 insertions, 36 deletions
diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h
index f00d6294a5b..3627d9d6760 100644
--- a/ydb/core/kqp/common/events/script_executions.h
+++ b/ydb/core/kqp/common/events/script_executions.h
@@ -1,5 +1,6 @@
#pragma once
#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
+#include <ydb/core/protos/kqp.pb.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/public/api/protos/ydb_operation.pb.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
@@ -47,8 +48,10 @@ struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScript
};
struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvGetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvGetScriptExecutionOperationResponse> {
- TEvGetScriptExecutionOperationResponse(bool ready, Ydb::StatusIds::StatusCode status, NYql::TIssues issues, TMaybe<google::protobuf::Any> metadata)
+ TEvGetScriptExecutionOperationResponse(bool ready, bool leaseExpired, TActorId runScriptActorId, Ydb::StatusIds::StatusCode status, NYql::TIssues issues, TMaybe<google::protobuf::Any> metadata)
: Ready(ready)
+ , LeaseExpired(leaseExpired)
+ , RunScriptActorId(runScriptActorId)
, Status(status)
, Issues(std::move(issues))
, Metadata(std::move(metadata))
@@ -57,12 +60,15 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvG
TEvGetScriptExecutionOperationResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: Ready(false)
+ , LeaseExpired(false)
, Status(status)
, Issues(std::move(issues))
{
}
bool Ready;
+ bool LeaseExpired;
+ TActorId RunScriptActorId;
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
TMaybe<google::protobuf::Any> Metadata;
@@ -102,18 +108,26 @@ struct TEvListScriptExecutionOperationsResponse : public NActors::TEventLocal<TE
};
struct TEvScriptLeaseUpdateResponse : public NActors::TEventLocal<TEvScriptLeaseUpdateResponse, TKqpScriptExecutionEvents::EvScriptLeaseUpdateResponse> {
- TEvScriptLeaseUpdateResponse(bool executionEntryExists, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
+ TEvScriptLeaseUpdateResponse(bool executionEntryExists, TInstant currentDeadline, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: ExecutionEntryExists(executionEntryExists)
+ , CurrentDeadline(currentDeadline)
, Status(status)
, Issues(std::move(issues))
{
}
bool ExecutionEntryExists;
+ TInstant CurrentDeadline;
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
};
+struct TEvCheckAliveRequest : public NActors::TEventPB<TEvCheckAliveRequest, NKikimrKqp::TEvCheckAliveRequest, TKqpScriptExecutionEvents::EvCheckAliveRequest> {
+};
+
+struct TEvCheckAliveResponse : public NActors::TEventPB<TEvCheckAliveResponse, NKikimrKqp::TEvCheckAliveResponse, TKqpScriptExecutionEvents::EvCheckAliveResponse> {
+};
+
struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h
index 83327d6d0d5..ea89513a22a 100644
--- a/ydb/core/kqp/common/simple/kqp_event_ids.h
+++ b/ydb/core/kqp/common/simple/kqp_event_ids.h
@@ -139,6 +139,8 @@ struct TKqpScriptExecutionEvents {
EvForgetScriptExecutionOperationResponse,
EvSaveScriptResultMetaFinished,
EvSaveScriptResultFinished,
+ EvCheckAliveRequest,
+ EvCheckAliveResponse,
};
};
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index b6e083b14bb..053556b2623 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -659,7 +659,7 @@ public:
void Handle(TEvKqp::TEvScriptRequest::TPtr& ev) {
if (CheckScriptExecutionsTablesReady<TEvKqp::TEvScriptResponse>(ev)) {
- Register(CreateScriptExecutionCreatorActor(std::move(ev)));
+ Register(CreateScriptExecutionCreatorActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId);
}
}
@@ -1464,7 +1464,7 @@ private:
switch (ScriptExecutionsCreationStatus) {
case EScriptExecutionsCreationStatus::NotStarted:
ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Pending;
- Register(CreateScriptExecutionsTablesCreator(MakeHolder<TEvPrivate::TEvScriptExecutionsTablesCreationFinished>()));
+ Register(CreateScriptExecutionsTablesCreator(MakeHolder<TEvPrivate::TEvScriptExecutionsTablesCreationFinished>()), TMailboxType::HTSwap, AppData()->SystemPoolId);
[[fallthrough]];
case EScriptExecutionsCreationStatus::Pending:
if (DelayedEventsQueue.size() < 10000) {
@@ -1490,25 +1490,25 @@ private:
void Handle(NKqp::TEvForgetScriptExecutionOperation::TPtr& ev) {
if (CheckScriptExecutionsTablesReady<TEvForgetScriptExecutionOperationResponse>(ev)) {
- Register(CreateForgetScriptExecutionOperationActor(std::move(ev)));
+ Register(CreateForgetScriptExecutionOperationActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId);
}
}
void Handle(NKqp::TEvGetScriptExecutionOperation::TPtr& ev) {
if (CheckScriptExecutionsTablesReady<TEvGetScriptExecutionOperationResponse>(ev)) {
- Register(CreateGetScriptExecutionOperationActor(std::move(ev)));
+ Register(CreateGetScriptExecutionOperationActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId);
}
}
void Handle(NKqp::TEvListScriptExecutionOperations::TPtr& ev) {
if (CheckScriptExecutionsTablesReady<TEvListScriptExecutionOperationsResponse>(ev)) {
- Register(CreateListScriptExecutionOperationsActor(std::move(ev)));
+ Register(CreateListScriptExecutionOperationsActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId);
}
}
void Handle(NKqp::TEvCancelScriptExecutionOperation::TPtr& ev) {
if (CheckScriptExecutionsTablesReady<TEvCancelScriptExecutionOperationResponse>(ev)) {
- Register(CreateCancelScriptExecutionOperationActor(std::move(ev)));
+ Register(CreateCancelScriptExecutionOperationActor(std::move(ev)), TMailboxType::HTSwap, AppData()->SystemPoolId);
}
}
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index 99a2dad4c15..d226a491d9d 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -403,14 +403,16 @@ public:
return;
}
+ LeaseDeadline = TInstant::Now() + LeaseDuration;
+
TString sql = R"(
-- TScriptLeaseUpdater::OnGetLeaseInfo
DECLARE $database AS Text;
DECLARE $execution_id AS Text;
- DECLARE $lease_deadline AS Timestamp;
+ DECLARE $lease_duration AS Interval;
UPDATE `.metadata/script_execution_leases`
- SET lease_deadline=$lease_deadline
+ SET lease_deadline=(CurrentUtcTimestamp() + $lease_duration)
WHERE database = $database AND execution_id = $execution_id;
)";
@@ -422,8 +424,8 @@ public:
.AddParam("$execution_id")
.Utf8(ExecutionId)
.Build()
- .AddParam("$lease_deadline")
- .Timestamp(TInstant::Now() + LeaseDuration)
+ .AddParam("$lease_duration")
+ .Interval(static_cast<i64>(LeaseDuration.MicroSeconds()))
.Build();
RunDataQuery(sql, &params, TTxControl::ContinueAndCommitTx());
@@ -435,13 +437,14 @@ public:
}
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
- Send(Owner, new TEvScriptLeaseUpdateResponse(LeaseExists, status, std::move(issues)));
+ Send(Owner, new TEvScriptLeaseUpdateResponse(LeaseExists, LeaseDeadline, status, std::move(issues)));
}
private:
const TString Database;
const TString ExecutionId;
const TDuration LeaseDuration;
+ TInstant LeaseDeadline;
bool LeaseExists = true;
};
@@ -1022,10 +1025,12 @@ private:
NYql::TIssues Issues;
};
-class TGetScriptExecutionOperationActor : public TScriptExecutionFinisherBase {
+class TGetScriptExecutionOperationQueryActor : public TScriptExecutionFinisherBase {
public:
- explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev)
- : Request(std::move(ev))
+ TGetScriptExecutionOperationQueryActor(const TString& database, const NOperationId::TOperationId& operationId, bool finishIfLeaseExpired)
+ : Database(database)
+ , OperationId(operationId)
+ , FinishIfLeaseExpired(finishIfLeaseExpired)
, StartActorTime(TInstant::Now())
{}
@@ -1036,6 +1041,7 @@ public:
DECLARE $execution_id AS Text;
SELECT
+ run_script_actor_id,
operation_status,
execution_status,
query_text,
@@ -1057,21 +1063,21 @@ public:
WHERE database = $database AND execution_id = $execution_id;
)";
- TMaybe<TString> maybeExecutionId = ScriptExecutionIdFromOperation(Request->Get()->OperationId);
+ TMaybe<TString> maybeExecutionId = ScriptExecutionIdFromOperation(OperationId);
Y_ENSURE(maybeExecutionId, "No execution id specified");
ExecutionId = *maybeExecutionId;
NYdb::TParamsBuilder params;
params
.AddParam("$database")
- .Utf8(Request->Get()->Database)
+ .Utf8(Database)
.Build()
.AddParam("$execution_id")
.Utf8(ExecutionId)
.Build();
RunDataQuery(sql, &params, TTxControl::BeginTx());
- SetQueryResultHandler(&TGetScriptExecutionOperationActor::OnGetInfo);
+ SetQueryResultHandler(&TGetScriptExecutionOperationQueryActor::OnGetInfo);
}
void OnGetInfo() {
@@ -1092,7 +1098,7 @@ public:
Ready = true;
}
- Metadata.set_execution_id(*ScriptExecutionIdFromOperation(Request->Get()->OperationId));
+ Metadata.set_execution_id(*ScriptExecutionIdFromOperation(OperationId));
const TMaybe<i32> executionStatus = result.ColumnParser("execution_status").GetOptionalInt32();
if (executionStatus) {
@@ -1151,6 +1157,11 @@ public:
}
}
+ const TMaybe<TString> runScriptActorIdString = result.ColumnParser("run_script_actor_id").GetOptionalUtf8();
+ if (runScriptActorIdString) {
+ ScriptExecutionRunnerActorIdFromString(*runScriptActorIdString, RunScriptActorId);
+ }
+
if (!operationStatus) {
// Check lease deadline
NYdb::TResultSetParser deadlineResult(ResultSets[1]);
@@ -1180,14 +1191,14 @@ public:
return;
}
- if (*leaseDeadline < StartActorTime) {
- LeaseExpired = true;
- FinishScriptExecution(Request->Get()->Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl, Issues);
- SetQueryResultHandler(&TGetScriptExecutionOperationActor::OnFinishOperation);
+ LeaseExpired = *leaseDeadline < StartActorTime;
+ if (LeaseExpired && FinishIfLeaseExpired) {
+ FinishScriptExecution(Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl, Issues);
+ SetQueryResultHandler(&TGetScriptExecutionOperationQueryActor::OnFinishOperation);
}
}
- if (!LeaseExpired) {
+ if (!LeaseExpired || !FinishIfLeaseExpired) {
CommitTransaction();
}
}
@@ -1205,22 +1216,114 @@ public:
TMaybe<google::protobuf::Any> metadata;
metadata.ConstructInPlace().PackFrom(Metadata);
- Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(Ready, Ydb::StatusIds::SUCCESS, std::move(Issues), std::move(metadata)));
+ Send(Owner, new TEvGetScriptExecutionOperationResponse(Ready, LeaseExpired, RunScriptActorId, Ydb::StatusIds::SUCCESS, std::move(Issues), std::move(metadata)));
} else {
- Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(false, status, std::move(issues), Nothing()));
+ Send(Owner, new TEvGetScriptExecutionOperationResponse(false, LeaseExpired, RunScriptActorId, status, std::move(issues), Nothing()));
}
}
private:
- TEvGetScriptExecutionOperation::TPtr Request;
+ TString Database;
+ NOperationId::TOperationId OperationId;
+ bool FinishIfLeaseExpired;
TInstant StartActorTime;
TString ExecutionId;
bool Ready = false;
bool LeaseExpired = false;
+ TActorId RunScriptActorId;
NYql::TIssues Issues;
Ydb::Query::ExecuteScriptMetadata Metadata;
};
+class TGetScriptExecutionOperationActor : public TActorBootstrapped<TGetScriptExecutionOperationActor> {
+ using TBase = TActorBootstrapped<TGetScriptExecutionOperationActor>;
+
+ inline static const TDuration CHECK_ALIVE_REQUEST_TIMEOUT = TDuration::Seconds(60);
+
+public:
+ explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev)
+ : Request(std::move(ev))
+ {}
+
+ void Bootstrap() {
+ CreateGetScriptExecutionOperationQuery(false);
+ Become(&TGetScriptExecutionOperationActor::StateFunc);
+ }
+
+private:
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvGetScriptExecutionOperationResponse, Handle);
+ hFunc(TEvCheckAliveResponse, Handle);
+ hFunc(TEvents::TEvWakeup, Handle);
+ hFunc(NActors::TEvents::TEvUndelivered, Handle);
+ hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
+ )
+
+ void CreateGetScriptExecutionOperationQuery(bool finishIfLeaseExpired) {
+ Register(new TGetScriptExecutionOperationQueryActor(Request->Get()->Database, Request->Get()->OperationId, finishIfLeaseExpired));
+ }
+
+ void CreateFinishScriptExecutionOperationQuery() {
+ if (!WaitFinishQuery) {
+ WaitFinishQuery = true;
+ CreateGetScriptExecutionOperationQuery(true);
+ }
+ }
+
+ void Handle(TEvGetScriptExecutionOperationResponse::TPtr& ev) {
+ Response = std::move(ev);
+
+ if (WaitFinishQuery || !Response->Get()->LeaseExpired) {
+ Reply();
+ return;
+ }
+
+ Schedule(CHECK_ALIVE_REQUEST_TIMEOUT, new TEvents::TEvWakeup());
+
+ NActors::TActorId runScriptActor = Response->Get()->RunScriptActorId;
+ ui64 flags = IEventHandle::FlagTrackDelivery;
+ if (runScriptActor.NodeId() != SelfId().NodeId()) {
+ flags |= IEventHandle::FlagSubscribeOnSession;
+ SubscribedOnSession = runScriptActor.NodeId();
+ }
+ Send(runScriptActor, new TEvCheckAliveRequest(), flags);
+ }
+
+ void Handle(TEvCheckAliveResponse::TPtr&) {
+ Reply();
+ }
+
+ void Handle(TEvents::TEvWakeup::TPtr&) {
+ CreateFinishScriptExecutionOperationQuery();
+ }
+
+ void Handle(NActors::TEvents::TEvUndelivered::TPtr&) {
+ CreateFinishScriptExecutionOperationQuery();
+ }
+
+ void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) {
+ CreateFinishScriptExecutionOperationQuery();
+ }
+
+ void Reply() {
+ Send(Request->Sender, Response->Release().Release());
+ PassAway();
+ }
+
+ void PassAway() override {
+ if (SubscribedOnSession) {
+ Send(TActivationContext::InterconnectProxy(*SubscribedOnSession), new TEvents::TEvUnsubscribe());
+ }
+ TBase::PassAway();
+ }
+
+private:
+ TEvGetScriptExecutionOperation::TPtr Request;
+ TEvGetScriptExecutionOperationResponse::TPtr Response;
+ bool WaitFinishQuery = false;
+ TMaybe<ui32> SubscribedOnSession;
+};
+
class TListScriptExecutionOperationsQuery : public TQueryBase {
public:
TListScriptExecutionOperationsQuery(const TString& database, const TString& pageToken, ui64 pageSize)
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
index 669512139b0..29c8795c859 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
@@ -78,6 +78,7 @@ private:
hFunc(TEvScriptLeaseUpdateResponse, Handle);
hFunc(TEvSaveScriptResultMetaFinished, Handle);
hFunc(TEvSaveScriptResultFinished, Handle);
+ hFunc(TEvCheckAliveRequest, Handle);
)
void SendToKqpProxy(THolder<NActors::IEventBase> ev) {
@@ -133,6 +134,15 @@ private:
SendToKqpProxy(std::move(ev));
}
+ void Handle(TEvCheckAliveRequest::TPtr& ev) {
+ Send(ev->Sender, new TEvCheckAliveResponse());
+ }
+
+ void RunLeaseUpdater() {
+ Register(CreateScriptLeaseUpdateActor(SelfId(), Database, ExecutionId, LeaseDuration));
+ LeaseUpdateQueryRunning = true;
+ }
+
// TODO: remove this after there will be a normal way to store results and generate execution id
void Handle(NActors::TEvents::TEvWakeup::TPtr& ev) {
switch (ev->Get()->Tag) {
@@ -143,15 +153,9 @@ private:
break;
case EWakeUp::UpdateLeaseEvent:
- if (RunState == ERunState::Cancelled || RunState == ERunState::Cancelling || RunState == ERunState::Finished || RunState == ERunState::Finishing) {
- break;
- }
-
- if (!LeaseUpdateQueryRunning && !FinalStatusIsSaved) {
- Register(CreateScriptLeaseUpdateActor(SelfId(), Database, ExecutionId, LeaseDuration));
- LeaseUpdateQueryRunning = true;
+ if (IsExecuting() && !FinalStatusIsSaved) {
+ RunLeaseUpdater();
}
- Schedule(LeaseDuration / LEASE_UPDATE_FREQUENCY, new NActors::TEvents::TEvWakeup(EWakeUp::UpdateLeaseEvent));
break;
}
}
@@ -192,6 +196,15 @@ private:
} else if (IsExecuting() && ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
Finish(ev->Get()->Status);
}
+
+ if (IsExecuting()) {
+ TInstant leaseUpdateTime = ev->Get()->CurrentDeadline - LeaseDuration / LEASE_UPDATE_FREQUENCY;
+ if (TInstant::Now() >= leaseUpdateTime) {
+ RunLeaseUpdater();
+ } else {
+ Schedule(leaseUpdateTime, new NActors::TEvents::TEvWakeup(EWakeUp::UpdateLeaseEvent));
+ }
+ }
}
// Event in case of error in registering script in database
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 835bc70a0cc..4fd27d332c5 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -676,6 +676,13 @@ message TEvCancelScriptExecutionResponse {
repeated Ydb.Issue.IssueMessage Issues = 2;
}
+// Request that is sent to run script actor to check his existence.
+message TEvCheckAliveRequest {
+}
+
+message TEvCheckAliveResponse {
+}
+
// stored in column "meta" of .metadata/script_executions table
message TScriptExecutionOperationMeta {
optional google.protobuf.Duration OperationTtl = 1;