diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-05-19 21:16:47 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-05-19 21:16:47 +0300 |
commit | b6eecf2aaed456793a9f0e559af11e63bace7e60 (patch) | |
tree | 1b4ff1c7b4891a0ba2cd50965b252a7a1e704c8e | |
parent | 0393681f68a46b340f802ada83a7b88b2e46d648 (diff) | |
download | ydb-b6eecf2aaed456793a9f0e559af11e63bace7e60.tar.gz |
Slow checkpoint diagnostic in compute actor
ref:454db5d21f8b5d696674df5652a8a3e2cc76c1ea
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp | 55 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h | 7 |
2 files changed, 56 insertions, 6 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp index d80a4f97008..e32000da668 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp @@ -29,6 +29,8 @@ using namespace NActors; namespace { +constexpr TDuration SLOW_CHECKPOINT_DURATION = TDuration::Minutes(1); + TString MakeStringForLog(const NDqProto::TCheckpoint& checkpoint) { return TStringBuilder() << checkpoint.GetGeneration() << "." << checkpoint.GetId(); } @@ -133,6 +135,7 @@ STRICT_STFUNC_EXC(TDqComputeActorCheckpoints::StateFunc, hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle); hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle); hFunc(TEvRetryQueuePrivate::TEvRetry, Handle); + hFunc(TEvents::TEvWakeup, Handle); cFunc(TEvents::TEvPoisonPill::EventType, PassAway);, ExceptionFunc(std::exception, HandleException) ) @@ -202,9 +205,10 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvNewCheckpointCoordinato Y_VERIFY(EventsQueue.OnEventReceived(ev->Get())); EventsQueue.Send(new TEvDqCompute::TEvNewCheckpointCoordinatorAck()); - if (PendingCheckpoint) { + const bool resumeInputs = bool(PendingCheckpoint); + AbortCheckpoint(); + if (resumeInputs) { LOG_I("Drop pending checkpoint since coordinator is stale"); - PendingCheckpoint.Clear(); ComputeActor->ResumeInputs(); } } @@ -217,7 +221,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvInjectCheckpoint::TPtr& YQL_ENSURE(IngressTask, "Shouldn't inject barriers into non-ingress tasks"); YQL_ENSURE(!PendingCheckpoint); - PendingCheckpoint = ev->Get()->Record.GetCheckpoint(); + StartCheckpoint(ev->Get()->Record.GetCheckpoint()); LOG_CP_I("Got TEvInjectCheckpoint"); ComputeActor->ResumeExecution(); } @@ -227,6 +231,8 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvSaveTaskStateResult::TP return; } + SavingToDatabase = false; + CheckpointStartTime = TInstant::Zero(); EventsQueue.Send(ev->Release().Release(), ev->Cookie); } @@ -376,6 +382,25 @@ void TDqComputeActorCheckpoints::Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev EventsQueue.Retry(); } +void TDqComputeActorCheckpoints::Handle(NActors::TEvents::TEvWakeup::TPtr&) { + if (CheckpointStartTime && (TActivationContext::Now() - CheckpointStartTime) >= SLOW_CHECKPOINT_DURATION) { + TStringBuilder checkpointDiagnostic; + if (PendingCheckpoint.Checkpoint) { + checkpointDiagnostic << "[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] "; + } + checkpointDiagnostic << "Slow checkpoint. Duration: " << (TInstant::Now() - CheckpointStartTime).Seconds() << 's'; + if (PendingCheckpoint) { + checkpointDiagnostic << " CA: " << PendingCheckpoint.SavedComputeActorState; + if (PendingCheckpoint.SinksCount) { + checkpointDiagnostic << " Sinks: " << PendingCheckpoint.SavedSinkStatesCount << '/' << PendingCheckpoint.SinksCount; + } + } + checkpointDiagnostic << " SavingToDatabase: " << SavingToDatabase; + LOG_W(checkpointDiagnostic); + } + Schedule(SLOW_CHECKPOINT_DURATION, new NActors::TEvents::TEvWakeup()); +} + bool TDqComputeActorCheckpoints::HasPendingCheckpoint() const { return PendingCheckpoint; } @@ -410,7 +435,7 @@ bool TDqComputeActorCheckpoints::SaveState() { PendingCheckpoint.SavedComputeActorState = true; ComputeActor->SaveState(*PendingCheckpoint.Checkpoint, PendingCheckpoint.ComputeActorState); } catch (const std::exception& e) { - PendingCheckpoint.Clear(); + AbortCheckpoint(); LOG_CP_E("Failed to save state: " << e.what()); auto resultEv = MakeHolder<TEvDqCompute::TEvSaveTaskStateResult>(); @@ -428,7 +453,7 @@ bool TDqComputeActorCheckpoints::SaveState() { void TDqComputeActorCheckpoints::RegisterCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId) { if (!PendingCheckpoint) { - PendingCheckpoint = checkpoint; + StartCheckpoint(checkpoint); } else { YQL_ENSURE(PendingCheckpoint.Checkpoint->GetGeneration() == checkpoint.GetGeneration()); YQL_ENSURE(PendingCheckpoint.Checkpoint->GetId() == checkpoint.GetId()); @@ -437,6 +462,23 @@ void TDqComputeActorCheckpoints::RegisterCheckpoint(const NDqProto::TCheckpoint& ComputeActor->ResumeExecution(); } +void TDqComputeActorCheckpoints::StartCheckpoint(const NDqProto::TCheckpoint& checkpoint) { + PendingCheckpoint = checkpoint; + CheckpointStartTime = TActivationContext::Now(); + SavingToDatabase = false; + + if (!SlowCheckpointsMonitoringStarted) { + SlowCheckpointsMonitoringStarted = true; + Schedule(SLOW_CHECKPOINT_DURATION, new NActors::TEvents::TEvWakeup()); + } +} + +void TDqComputeActorCheckpoints::AbortCheckpoint() { + PendingCheckpoint.Clear(); + CheckpointStartTime = TInstant::Zero(); + SavingToDatabase = false; +} + void TDqComputeActorCheckpoints::OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) { Y_VERIFY(CheckpointCoordinator); Y_VERIFY(checkpoint.GetGeneration() <= CheckpointCoordinator->Generation); @@ -468,8 +510,9 @@ void TDqComputeActorCheckpoints::TryToSavePendingCheckpoint() { saveTaskStateRequest->State.Swap(&PendingCheckpoint.ComputeActorState); Send(CheckpointStorage, std::move(saveTaskStateRequest)); - LOG_CP_I("Task checkpoint is done"); + LOG_CP_I("Task checkpoint is done. Send to storage"); PendingCheckpoint.Clear(); + SavingToDatabase = true; } } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h index 1c9f2334473..4ec41f04e4c 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h @@ -98,6 +98,8 @@ public: bool SaveState(); NDqProto::TCheckpoint GetPendingCheckpoint() const; void RegisterCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId); + void StartCheckpoint(const NDqProto::TCheckpoint& checkpoint); + void AbortCheckpoint(); // Sink support. void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint); @@ -126,6 +128,7 @@ private: void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev); void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev); void Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev); + void Handle(NActors::TEvents::TEvWakeup::TPtr& ev); void HandleException(const std::exception& err); void PassAway() override; @@ -153,6 +156,10 @@ private: NYql::NDqProto::NDqStateLoadPlan::TTaskPlan TaskLoadPlan; NDqProto::TCheckpoint RestoringTaskRunnerForCheckpoint; ui64 RestoringTaskRunnerForEvent; + + bool SlowCheckpointsMonitoringStarted = false; + TInstant CheckpointStartTime; + bool SavingToDatabase = false; }; } // namespace NYql::NDq |