aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-05-19 21:16:47 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-05-19 21:16:47 +0300
commitb6eecf2aaed456793a9f0e559af11e63bace7e60 (patch)
tree1b4ff1c7b4891a0ba2cd50965b252a7a1e704c8e
parent0393681f68a46b340f802ada83a7b88b2e46d648 (diff)
downloadydb-b6eecf2aaed456793a9f0e559af11e63bace7e60.tar.gz
Slow checkpoint diagnostic in compute actor
ref:454db5d21f8b5d696674df5652a8a3e2cc76c1ea
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp55
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h7
2 files changed, 56 insertions, 6 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
index d80a4f97008..e32000da668 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
@@ -29,6 +29,8 @@ using namespace NActors;
namespace {
+constexpr TDuration SLOW_CHECKPOINT_DURATION = TDuration::Minutes(1);
+
TString MakeStringForLog(const NDqProto::TCheckpoint& checkpoint) {
return TStringBuilder() << checkpoint.GetGeneration() << "." << checkpoint.GetId();
}
@@ -133,6 +135,7 @@ STRICT_STFUNC_EXC(TDqComputeActorCheckpoints::StateFunc,
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle);
hFunc(TEvRetryQueuePrivate::TEvRetry, Handle);
+ hFunc(TEvents::TEvWakeup, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);,
ExceptionFunc(std::exception, HandleException)
)
@@ -202,9 +205,10 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvNewCheckpointCoordinato
Y_VERIFY(EventsQueue.OnEventReceived(ev->Get()));
EventsQueue.Send(new TEvDqCompute::TEvNewCheckpointCoordinatorAck());
- if (PendingCheckpoint) {
+ const bool resumeInputs = bool(PendingCheckpoint);
+ AbortCheckpoint();
+ if (resumeInputs) {
LOG_I("Drop pending checkpoint since coordinator is stale");
- PendingCheckpoint.Clear();
ComputeActor->ResumeInputs();
}
}
@@ -217,7 +221,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvInjectCheckpoint::TPtr&
YQL_ENSURE(IngressTask, "Shouldn't inject barriers into non-ingress tasks");
YQL_ENSURE(!PendingCheckpoint);
- PendingCheckpoint = ev->Get()->Record.GetCheckpoint();
+ StartCheckpoint(ev->Get()->Record.GetCheckpoint());
LOG_CP_I("Got TEvInjectCheckpoint");
ComputeActor->ResumeExecution();
}
@@ -227,6 +231,8 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvSaveTaskStateResult::TP
return;
}
+ SavingToDatabase = false;
+ CheckpointStartTime = TInstant::Zero();
EventsQueue.Send(ev->Release().Release(), ev->Cookie);
}
@@ -376,6 +382,25 @@ void TDqComputeActorCheckpoints::Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev
EventsQueue.Retry();
}
+void TDqComputeActorCheckpoints::Handle(NActors::TEvents::TEvWakeup::TPtr&) {
+ if (CheckpointStartTime && (TActivationContext::Now() - CheckpointStartTime) >= SLOW_CHECKPOINT_DURATION) {
+ TStringBuilder checkpointDiagnostic;
+ if (PendingCheckpoint.Checkpoint) {
+ checkpointDiagnostic << "[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] ";
+ }
+ checkpointDiagnostic << "Slow checkpoint. Duration: " << (TInstant::Now() - CheckpointStartTime).Seconds() << 's';
+ if (PendingCheckpoint) {
+ checkpointDiagnostic << " CA: " << PendingCheckpoint.SavedComputeActorState;
+ if (PendingCheckpoint.SinksCount) {
+ checkpointDiagnostic << " Sinks: " << PendingCheckpoint.SavedSinkStatesCount << '/' << PendingCheckpoint.SinksCount;
+ }
+ }
+ checkpointDiagnostic << " SavingToDatabase: " << SavingToDatabase;
+ LOG_W(checkpointDiagnostic);
+ }
+ Schedule(SLOW_CHECKPOINT_DURATION, new NActors::TEvents::TEvWakeup());
+}
+
bool TDqComputeActorCheckpoints::HasPendingCheckpoint() const {
return PendingCheckpoint;
}
@@ -410,7 +435,7 @@ bool TDqComputeActorCheckpoints::SaveState() {
PendingCheckpoint.SavedComputeActorState = true;
ComputeActor->SaveState(*PendingCheckpoint.Checkpoint, PendingCheckpoint.ComputeActorState);
} catch (const std::exception& e) {
- PendingCheckpoint.Clear();
+ AbortCheckpoint();
LOG_CP_E("Failed to save state: " << e.what());
auto resultEv = MakeHolder<TEvDqCompute::TEvSaveTaskStateResult>();
@@ -428,7 +453,7 @@ bool TDqComputeActorCheckpoints::SaveState() {
void TDqComputeActorCheckpoints::RegisterCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId) {
if (!PendingCheckpoint) {
- PendingCheckpoint = checkpoint;
+ StartCheckpoint(checkpoint);
} else {
YQL_ENSURE(PendingCheckpoint.Checkpoint->GetGeneration() == checkpoint.GetGeneration());
YQL_ENSURE(PendingCheckpoint.Checkpoint->GetId() == checkpoint.GetId());
@@ -437,6 +462,23 @@ void TDqComputeActorCheckpoints::RegisterCheckpoint(const NDqProto::TCheckpoint&
ComputeActor->ResumeExecution();
}
+void TDqComputeActorCheckpoints::StartCheckpoint(const NDqProto::TCheckpoint& checkpoint) {
+ PendingCheckpoint = checkpoint;
+ CheckpointStartTime = TActivationContext::Now();
+ SavingToDatabase = false;
+
+ if (!SlowCheckpointsMonitoringStarted) {
+ SlowCheckpointsMonitoringStarted = true;
+ Schedule(SLOW_CHECKPOINT_DURATION, new NActors::TEvents::TEvWakeup());
+ }
+}
+
+void TDqComputeActorCheckpoints::AbortCheckpoint() {
+ PendingCheckpoint.Clear();
+ CheckpointStartTime = TInstant::Zero();
+ SavingToDatabase = false;
+}
+
void TDqComputeActorCheckpoints::OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) {
Y_VERIFY(CheckpointCoordinator);
Y_VERIFY(checkpoint.GetGeneration() <= CheckpointCoordinator->Generation);
@@ -468,8 +510,9 @@ void TDqComputeActorCheckpoints::TryToSavePendingCheckpoint() {
saveTaskStateRequest->State.Swap(&PendingCheckpoint.ComputeActorState);
Send(CheckpointStorage, std::move(saveTaskStateRequest));
- LOG_CP_I("Task checkpoint is done");
+ LOG_CP_I("Task checkpoint is done. Send to storage");
PendingCheckpoint.Clear();
+ SavingToDatabase = true;
}
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
index 1c9f2334473..4ec41f04e4c 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
@@ -98,6 +98,8 @@ public:
bool SaveState();
NDqProto::TCheckpoint GetPendingCheckpoint() const;
void RegisterCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId);
+ void StartCheckpoint(const NDqProto::TCheckpoint& checkpoint);
+ void AbortCheckpoint();
// Sink support.
void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint);
@@ -126,6 +128,7 @@ private:
void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev);
void Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev);
+ void Handle(NActors::TEvents::TEvWakeup::TPtr& ev);
void HandleException(const std::exception& err);
void PassAway() override;
@@ -153,6 +156,10 @@ private:
NYql::NDqProto::NDqStateLoadPlan::TTaskPlan TaskLoadPlan;
NDqProto::TCheckpoint RestoringTaskRunnerForCheckpoint;
ui64 RestoringTaskRunnerForEvent;
+
+ bool SlowCheckpointsMonitoringStarted = false;
+ TInstant CheckpointStartTime;
+ bool SavingToDatabase = false;
};
} // namespace NYql::NDq