aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-07-16 14:51:13 +0300
committerhor911 <hor911@ydb.tech>2022-07-16 14:51:13 +0300
commit1fa71dfe5f97c247426cdba8779a5850b62b69dd (patch)
treed7716dcbdaa608d160e3a9dca011c65982d76b26
parent523de7ef4d4109ee44933eb75172e122a83873be (diff)
downloadydb-1fa71dfe5f97c247426cdba8779a5850b62b69dd.tar.gz
Better checkpointing logging
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp79
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h2
2 files changed, 40 insertions, 41 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
index e32000da66..2cf4e3cf22 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
@@ -7,6 +7,8 @@
#include <algorithm>
+#define LOG_T(s) \
+ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s)
#define LOG_D(s) \
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s)
#define LOG_I(s) \
@@ -16,10 +18,14 @@
#define LOG_E(s) \
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s)
+#define LOG_CP_T(s) \
+ LOG_T("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s)
#define LOG_CP_D(s) \
LOG_D("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s)
#define LOG_CP_I(s) \
LOG_I("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s)
+#define LOG_CP_W(s) \
+ LOG_W("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s)
#define LOG_CP_E(s) \
LOG_E("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s)
@@ -172,7 +178,7 @@ bool TDqComputeActorCheckpoints::ShouldIgnoreOldCoordinator(const E& ev, bool ve
Y_VERIFY(!verifyOnGenerationFromFuture || !CheckpointCoordinator || generation <= CheckpointCoordinator->Generation,
"Got incorrect checkpoint coordinator generation: %lu > %lu", generation, CheckpointCoordinator->Generation);
if (CheckpointCoordinator && generation < CheckpointCoordinator->Generation) {
- LOG_D("Ignoring event " << ev->Get()->ToStringHeader() << " from previous coordinator: "
+ LOG_W("Ignoring event " << ev->Get()->ToStringHeader() << " from previous coordinator: "
<< generation << " < " << CheckpointCoordinator->Generation);
return true;
}
@@ -184,7 +190,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvNewCheckpointCoordinato
return;
}
const ui64 newGeneration = ev->Get()->Record.GetGeneration();
- LOG_I("Got TEvNewCheckpointCoordinator event: generation " << newGeneration << ", actorId: " << ev->Sender);
+ LOG_D("Got TEvNewCheckpointCoordinator event: generation " << newGeneration << ", actorId: " << ev->Sender);
if (CheckpointCoordinator && CheckpointCoordinator->Generation == newGeneration) { // The same message. It was retry from coordinator.
Y_VERIFY(CheckpointCoordinator->ActorId == ev->Sender, "there shouldn't be two different checkpoint coordinators with the same generation");
@@ -193,9 +199,9 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvNewCheckpointCoordinato
}
if (CheckpointCoordinator) {
- LOG_I("Replace stale checkpoint coordinator (generation = " << CheckpointCoordinator->Generation << ") with a new one");
+ LOG_T("Replace stale checkpoint coordinator (generation = " << CheckpointCoordinator->Generation << ") with a new one");
} else {
- LOG_I("Assign checkpoint coordinator (generation = " << newGeneration << ")");
+ LOG_T("Assign checkpoint coordinator (generation = " << newGeneration << ")");
}
CheckpointCoordinator = TCheckpointCoordinatorId(ev->Sender, newGeneration);
@@ -208,7 +214,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvNewCheckpointCoordinato
const bool resumeInputs = bool(PendingCheckpoint);
AbortCheckpoint();
if (resumeInputs) {
- LOG_I("Drop pending checkpoint since coordinator is stale");
+ LOG_W("Drop pending checkpoint since coordinator is stale");
ComputeActor->ResumeInputs();
}
}
@@ -221,8 +227,8 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvInjectCheckpoint::TPtr&
YQL_ENSURE(IngressTask, "Shouldn't inject barriers into non-ingress tasks");
YQL_ENSURE(!PendingCheckpoint);
+ LOG_CP_D("TEvInjectCheckpoint");
StartCheckpoint(ev->Get()->Record.GetCheckpoint());
- LOG_CP_I("Got TEvInjectCheckpoint");
ComputeActor->ResumeExecution();
}
@@ -246,13 +252,12 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::
}
ComputeActor->Stop();
- TaskLoadPlan = ev->Get()->Record.GetStateLoadPlan();
+ StateLoadPlan = ev->Get()->Record.GetStateLoadPlan();
const auto& checkpoint = ev->Get()->Record.GetCheckpoint();
- LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Got TEvRestoreFromCheckpoint event with plan " << TaskLoadPlan);
- switch (TaskLoadPlan.GetStateType()) {
+ LOG_CP_D("TEvRestoreFromCheckpoint, StateLoadPlan = " << StateLoadPlan);
+ switch (StateLoadPlan.GetStateType()) {
case NDqProto::NDqStateLoadPlan::STATE_TYPE_EMPTY:
{
- LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Restored from empty state");
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK));
break;
}
@@ -273,15 +278,15 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::
CheckpointStorage,
new TEvDqCompute::TEvGetTaskState(
GraphId,
- TaskIdsFromLoadPlan(TaskLoadPlan),
+ TaskIdsFromLoadPlan(StateLoadPlan),
ev->Get()->Record.GetCheckpoint(),
CheckpointCoordinator->Generation));
break;
}
default:
{
- LOG_E("[Checkpoint " << MakeStringForLog(checkpoint) << "] Unsupported state type: "
- << NDqProto::NDqStateLoadPlan::EStateType_Name(TaskLoadPlan.GetStateType()) << " (" << static_cast<int>(TaskLoadPlan.GetStateType()) << ")");
+ LOG_CP_E("Unsupported state type: "
+ << NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast<int>(StateLoadPlan.GetStateType()) << ")");
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR));
break;
}
@@ -296,51 +301,47 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt
auto& checkpoint = ev->Get()->Checkpoint;
std::vector<ui64> taskIds;
size_t taskIdsSize = 1;
- if (TaskLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_FOREIGN) {
- taskIds = TaskIdsFromLoadPlan(TaskLoadPlan);
+ if (StateLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_FOREIGN) {
+ taskIds = TaskIdsFromLoadPlan(StateLoadPlan);
taskIdsSize = taskIds.size();
}
if (!ev->Get()->Issues.Empty()) {
- LOG_E("[Checkpoint " << MakeStringForLog(checkpoint)
- << "] Can't get state from storage: " << ev->Get()->Issues.ToString());
+ LOG_CP_E("TEvGetTaskStateResult error: " << ev->Get()->Issues.ToOneLineString());
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);
return;
}
if (ev->Get()->States.size() != taskIdsSize) {
- LOG_E("[Checkpoint " << MakeStringForLog(checkpoint)
- << "] Got unexpected states count. States count: " << ev->Get()->States.size()
- << ". Expected states count: " << taskIdsSize);
+ LOG_CP_E("TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);
return;
}
- LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Got TEvGetTaskStateResult event, restoring state");
+ LOG_CP_D("TEvGetTaskStateResult: restoring state");
RestoringTaskRunnerForCheckpoint = checkpoint;
RestoringTaskRunnerForEvent = ev->Cookie;
- if (TaskLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN) {
+ if (StateLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN) {
ComputeActor->LoadState(std::move(ev->Get()->States[0]));
- } else if (TaskLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_FOREIGN) {
- NDqProto::TComputeActorState state = CombineForeignState(TaskLoadPlan, ev->Get()->States, taskIds);
+ } else if (StateLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_FOREIGN) {
+ NDqProto::TComputeActorState state = CombineForeignState(StateLoadPlan, ev->Get()->States, taskIds);
ComputeActor->LoadState(std::move(state));
} else {
Y_FAIL("Unprocessed state type %s (%d)",
- NDqProto::NDqStateLoadPlan::EStateType_Name(TaskLoadPlan.GetStateType()).c_str(),
- static_cast<int>(TaskLoadPlan.GetStateType()));
+ NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()).c_str(),
+ static_cast<int>(StateLoadPlan.GetStateType()));
}
}
void TDqComputeActorCheckpoints::AfterStateLoading(const TMaybe<TString>& error) {
auto& checkpoint = RestoringTaskRunnerForCheckpoint;
if (error.Defined()) {
- LOG_E("[Checkpoint " << MakeStringForLog(checkpoint) << "] Failed to load state: " << error);
+ LOG_CP_E("Failed to load state: " << error << "ABORTED");
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR), RestoringTaskRunnerForEvent);
- LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Checkpoint state restoration aborted");
return;
}
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK), RestoringTaskRunnerForEvent);
- LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Checkpoint state restored");
+ LOG_CP_D("Checkpoint state restored");
}
void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRun::TPtr& ev) {
@@ -363,12 +364,12 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvCommitState::TPtr& ev)
}
void TDqComputeActorCheckpoints::Handle(NActors::TEvents::TEvPoison::TPtr&) {
- LOG_D("pass away");
+ LOG_I("Pass Away");
PassAway();
}
void TDqComputeActorCheckpoints::Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
- LOG_I("Handle disconnected node " << ev->Get()->NodeId);
+ LOG_D("Handle disconnected node " << ev->Get()->NodeId);
EventsQueue.HandleNodeDisconnected(ev->Get()->NodeId);
}
@@ -418,9 +419,9 @@ void TDqComputeActorCheckpoints::DoCheckpoint() {
Y_VERIFY(CheckpointCoordinator);
Y_VERIFY(PendingCheckpoint);
- LOG_CP_I("Performing task checkpoint");
+ LOG_CP_D("Performing task checkpoint");
if (SaveState()) {
- LOG_CP_D("Injecting checkpoint barrier to outputs");
+ LOG_CP_T("Injecting checkpoint barrier to outputs");
ComputeActor->InjectBarrierToOutputs(*PendingCheckpoint.Checkpoint);
TryToSavePendingCheckpoint();
}
@@ -428,8 +429,6 @@ void TDqComputeActorCheckpoints::DoCheckpoint() {
[[nodiscard]]
bool TDqComputeActorCheckpoints::SaveState() {
- LOG_CP_D("Saving task state");
-
try {
Y_VERIFY(!PendingCheckpoint.SavedComputeActorState);
PendingCheckpoint.SavedComputeActorState = true;
@@ -447,7 +446,7 @@ bool TDqComputeActorCheckpoints::SaveState() {
return false;
}
- LOG_CP_D("Compute actor state saved");
+ LOG_CP_T("CA state saved");
return true;
}
@@ -458,7 +457,7 @@ void TDqComputeActorCheckpoints::RegisterCheckpoint(const NDqProto::TCheckpoint&
YQL_ENSURE(PendingCheckpoint.Checkpoint->GetGeneration() == checkpoint.GetGeneration());
YQL_ENSURE(PendingCheckpoint.Checkpoint->GetId() == checkpoint.GetId());
}
- LOG_CP_I("Got checkpoint barrier from channel " << channelId);
+ LOG_CP_D("Got checkpoint barrier from channel " << channelId);
ComputeActor->ResumeExecution();
}
@@ -483,7 +482,7 @@ void TDqComputeActorCheckpoints::OnSinkStateSaved(NDqProto::TSinkState&& state,
Y_VERIFY(CheckpointCoordinator);
Y_VERIFY(checkpoint.GetGeneration() <= CheckpointCoordinator->Generation);
if (checkpoint.GetGeneration() < CheckpointCoordinator->Generation) {
- LOG_D("Ignoring sink[" << outputIndex << "] state saved event from previous coordinator: "
+ LOG_W("Ignoring sink[" << outputIndex << "] state saved event from previous coordinator: "
<< checkpoint.GetGeneration() << " < " << CheckpointCoordinator->Generation);
return;
}
@@ -498,7 +497,7 @@ void TDqComputeActorCheckpoints::OnSinkStateSaved(NDqProto::TSinkState&& state,
*sinkState = std::move(state);
sinkState->SetOutputIndex(outputIndex); // Set index explicitly to avoid errors
++PendingCheckpoint.SavedSinkStatesCount;
- LOG_D("Sink[" << outputIndex << "] state saved");
+ LOG_T("Sink[" << outputIndex << "] state saved");
TryToSavePendingCheckpoint();
}
@@ -510,7 +509,7 @@ void TDqComputeActorCheckpoints::TryToSavePendingCheckpoint() {
saveTaskStateRequest->State.Swap(&PendingCheckpoint.ComputeActorState);
Send(CheckpointStorage, std::move(saveTaskStateRequest));
- LOG_CP_I("Task checkpoint is done. Send to storage");
+ LOG_CP_D("Task checkpoint is done. Send to storage");
PendingCheckpoint.Clear();
SavingToDatabase = true;
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
index e0e671fdac..61c007708d 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
@@ -153,7 +153,7 @@ private:
TRetryEventsQueue EventsQueue;
// Restore
- NYql::NDqProto::NDqStateLoadPlan::TTaskPlan TaskLoadPlan;
+ NYql::NDqProto::NDqStateLoadPlan::TTaskPlan StateLoadPlan;
NDqProto::TCheckpoint RestoringTaskRunnerForCheckpoint;
ui64 RestoringTaskRunnerForEvent;