diff options
author | hor911 <hor911@ydb.tech> | 2022-07-16 14:51:13 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-07-16 14:51:13 +0300 |
commit | 1fa71dfe5f97c247426cdba8779a5850b62b69dd (patch) | |
tree | d7716dcbdaa608d160e3a9dca011c65982d76b26 | |
parent | 523de7ef4d4109ee44933eb75172e122a83873be (diff) | |
download | ydb-1fa71dfe5f97c247426cdba8779a5850b62b69dd.tar.gz |
Better checkpointing logging
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp | 79 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h | 2 |
2 files changed, 40 insertions, 41 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp index e32000da66..2cf4e3cf22 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp @@ -7,6 +7,8 @@ #include <algorithm> +#define LOG_T(s) \ + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s) #define LOG_D(s) \ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s) #define LOG_I(s) \ @@ -16,10 +18,14 @@ #define LOG_E(s) \ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s) +#define LOG_CP_T(s) \ + LOG_T("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s) #define LOG_CP_D(s) \ LOG_D("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s) #define LOG_CP_I(s) \ LOG_I("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s) +#define LOG_CP_W(s) \ + LOG_W("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s) #define LOG_CP_E(s) \ LOG_E("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s) @@ -172,7 +178,7 @@ bool TDqComputeActorCheckpoints::ShouldIgnoreOldCoordinator(const E& ev, bool ve Y_VERIFY(!verifyOnGenerationFromFuture || !CheckpointCoordinator || generation <= CheckpointCoordinator->Generation, "Got incorrect checkpoint coordinator generation: %lu > %lu", generation, CheckpointCoordinator->Generation); if (CheckpointCoordinator && generation < CheckpointCoordinator->Generation) { - LOG_D("Ignoring event " << ev->Get()->ToStringHeader() << " from previous coordinator: " + LOG_W("Ignoring event " << ev->Get()->ToStringHeader() << " from previous coordinator: " << generation << " < " << CheckpointCoordinator->Generation); return true; } @@ -184,7 +190,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvNewCheckpointCoordinato return; } const ui64 newGeneration = ev->Get()->Record.GetGeneration(); - LOG_I("Got TEvNewCheckpointCoordinator event: generation " << newGeneration << ", actorId: " << ev->Sender); + LOG_D("Got TEvNewCheckpointCoordinator event: generation " << newGeneration << ", actorId: " << ev->Sender); if (CheckpointCoordinator && CheckpointCoordinator->Generation == newGeneration) { // The same message. It was retry from coordinator. Y_VERIFY(CheckpointCoordinator->ActorId == ev->Sender, "there shouldn't be two different checkpoint coordinators with the same generation"); @@ -193,9 +199,9 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvNewCheckpointCoordinato } if (CheckpointCoordinator) { - LOG_I("Replace stale checkpoint coordinator (generation = " << CheckpointCoordinator->Generation << ") with a new one"); + LOG_T("Replace stale checkpoint coordinator (generation = " << CheckpointCoordinator->Generation << ") with a new one"); } else { - LOG_I("Assign checkpoint coordinator (generation = " << newGeneration << ")"); + LOG_T("Assign checkpoint coordinator (generation = " << newGeneration << ")"); } CheckpointCoordinator = TCheckpointCoordinatorId(ev->Sender, newGeneration); @@ -208,7 +214,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvNewCheckpointCoordinato const bool resumeInputs = bool(PendingCheckpoint); AbortCheckpoint(); if (resumeInputs) { - LOG_I("Drop pending checkpoint since coordinator is stale"); + LOG_W("Drop pending checkpoint since coordinator is stale"); ComputeActor->ResumeInputs(); } } @@ -221,8 +227,8 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvInjectCheckpoint::TPtr& YQL_ENSURE(IngressTask, "Shouldn't inject barriers into non-ingress tasks"); YQL_ENSURE(!PendingCheckpoint); + LOG_CP_D("TEvInjectCheckpoint"); StartCheckpoint(ev->Get()->Record.GetCheckpoint()); - LOG_CP_I("Got TEvInjectCheckpoint"); ComputeActor->ResumeExecution(); } @@ -246,13 +252,12 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint:: } ComputeActor->Stop(); - TaskLoadPlan = ev->Get()->Record.GetStateLoadPlan(); + StateLoadPlan = ev->Get()->Record.GetStateLoadPlan(); const auto& checkpoint = ev->Get()->Record.GetCheckpoint(); - LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Got TEvRestoreFromCheckpoint event with plan " << TaskLoadPlan); - switch (TaskLoadPlan.GetStateType()) { + LOG_CP_D("TEvRestoreFromCheckpoint, StateLoadPlan = " << StateLoadPlan); + switch (StateLoadPlan.GetStateType()) { case NDqProto::NDqStateLoadPlan::STATE_TYPE_EMPTY: { - LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Restored from empty state"); EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK)); break; } @@ -273,15 +278,15 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint:: CheckpointStorage, new TEvDqCompute::TEvGetTaskState( GraphId, - TaskIdsFromLoadPlan(TaskLoadPlan), + TaskIdsFromLoadPlan(StateLoadPlan), ev->Get()->Record.GetCheckpoint(), CheckpointCoordinator->Generation)); break; } default: { - LOG_E("[Checkpoint " << MakeStringForLog(checkpoint) << "] Unsupported state type: " - << NDqProto::NDqStateLoadPlan::EStateType_Name(TaskLoadPlan.GetStateType()) << " (" << static_cast<int>(TaskLoadPlan.GetStateType()) << ")"); + LOG_CP_E("Unsupported state type: " + << NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast<int>(StateLoadPlan.GetStateType()) << ")"); EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR)); break; } @@ -296,51 +301,47 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt auto& checkpoint = ev->Get()->Checkpoint; std::vector<ui64> taskIds; size_t taskIdsSize = 1; - if (TaskLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_FOREIGN) { - taskIds = TaskIdsFromLoadPlan(TaskLoadPlan); + if (StateLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_FOREIGN) { + taskIds = TaskIdsFromLoadPlan(StateLoadPlan); taskIdsSize = taskIds.size(); } if (!ev->Get()->Issues.Empty()) { - LOG_E("[Checkpoint " << MakeStringForLog(checkpoint) - << "] Can't get state from storage: " << ev->Get()->Issues.ToString()); + LOG_CP_E("TEvGetTaskStateResult error: " << ev->Get()->Issues.ToOneLineString()); EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie); return; } if (ev->Get()->States.size() != taskIdsSize) { - LOG_E("[Checkpoint " << MakeStringForLog(checkpoint) - << "] Got unexpected states count. States count: " << ev->Get()->States.size() - << ". Expected states count: " << taskIdsSize); + LOG_CP_E("TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize); EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie); return; } - LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Got TEvGetTaskStateResult event, restoring state"); + LOG_CP_D("TEvGetTaskStateResult: restoring state"); RestoringTaskRunnerForCheckpoint = checkpoint; RestoringTaskRunnerForEvent = ev->Cookie; - if (TaskLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN) { + if (StateLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN) { ComputeActor->LoadState(std::move(ev->Get()->States[0])); - } else if (TaskLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_FOREIGN) { - NDqProto::TComputeActorState state = CombineForeignState(TaskLoadPlan, ev->Get()->States, taskIds); + } else if (StateLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_FOREIGN) { + NDqProto::TComputeActorState state = CombineForeignState(StateLoadPlan, ev->Get()->States, taskIds); ComputeActor->LoadState(std::move(state)); } else { Y_FAIL("Unprocessed state type %s (%d)", - NDqProto::NDqStateLoadPlan::EStateType_Name(TaskLoadPlan.GetStateType()).c_str(), - static_cast<int>(TaskLoadPlan.GetStateType())); + NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()).c_str(), + static_cast<int>(StateLoadPlan.GetStateType())); } } void TDqComputeActorCheckpoints::AfterStateLoading(const TMaybe<TString>& error) { auto& checkpoint = RestoringTaskRunnerForCheckpoint; if (error.Defined()) { - LOG_E("[Checkpoint " << MakeStringForLog(checkpoint) << "] Failed to load state: " << error); + LOG_CP_E("Failed to load state: " << error << "ABORTED"); EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR), RestoringTaskRunnerForEvent); - LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Checkpoint state restoration aborted"); return; } EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK), RestoringTaskRunnerForEvent); - LOG_I("[Checkpoint " << MakeStringForLog(checkpoint) << "] Checkpoint state restored"); + LOG_CP_D("Checkpoint state restored"); } void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRun::TPtr& ev) { @@ -363,12 +364,12 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvCommitState::TPtr& ev) } void TDqComputeActorCheckpoints::Handle(NActors::TEvents::TEvPoison::TPtr&) { - LOG_D("pass away"); + LOG_I("Pass Away"); PassAway(); } void TDqComputeActorCheckpoints::Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { - LOG_I("Handle disconnected node " << ev->Get()->NodeId); + LOG_D("Handle disconnected node " << ev->Get()->NodeId); EventsQueue.HandleNodeDisconnected(ev->Get()->NodeId); } @@ -418,9 +419,9 @@ void TDqComputeActorCheckpoints::DoCheckpoint() { Y_VERIFY(CheckpointCoordinator); Y_VERIFY(PendingCheckpoint); - LOG_CP_I("Performing task checkpoint"); + LOG_CP_D("Performing task checkpoint"); if (SaveState()) { - LOG_CP_D("Injecting checkpoint barrier to outputs"); + LOG_CP_T("Injecting checkpoint barrier to outputs"); ComputeActor->InjectBarrierToOutputs(*PendingCheckpoint.Checkpoint); TryToSavePendingCheckpoint(); } @@ -428,8 +429,6 @@ void TDqComputeActorCheckpoints::DoCheckpoint() { [[nodiscard]] bool TDqComputeActorCheckpoints::SaveState() { - LOG_CP_D("Saving task state"); - try { Y_VERIFY(!PendingCheckpoint.SavedComputeActorState); PendingCheckpoint.SavedComputeActorState = true; @@ -447,7 +446,7 @@ bool TDqComputeActorCheckpoints::SaveState() { return false; } - LOG_CP_D("Compute actor state saved"); + LOG_CP_T("CA state saved"); return true; } @@ -458,7 +457,7 @@ void TDqComputeActorCheckpoints::RegisterCheckpoint(const NDqProto::TCheckpoint& YQL_ENSURE(PendingCheckpoint.Checkpoint->GetGeneration() == checkpoint.GetGeneration()); YQL_ENSURE(PendingCheckpoint.Checkpoint->GetId() == checkpoint.GetId()); } - LOG_CP_I("Got checkpoint barrier from channel " << channelId); + LOG_CP_D("Got checkpoint barrier from channel " << channelId); ComputeActor->ResumeExecution(); } @@ -483,7 +482,7 @@ void TDqComputeActorCheckpoints::OnSinkStateSaved(NDqProto::TSinkState&& state, Y_VERIFY(CheckpointCoordinator); Y_VERIFY(checkpoint.GetGeneration() <= CheckpointCoordinator->Generation); if (checkpoint.GetGeneration() < CheckpointCoordinator->Generation) { - LOG_D("Ignoring sink[" << outputIndex << "] state saved event from previous coordinator: " + LOG_W("Ignoring sink[" << outputIndex << "] state saved event from previous coordinator: " << checkpoint.GetGeneration() << " < " << CheckpointCoordinator->Generation); return; } @@ -498,7 +497,7 @@ void TDqComputeActorCheckpoints::OnSinkStateSaved(NDqProto::TSinkState&& state, *sinkState = std::move(state); sinkState->SetOutputIndex(outputIndex); // Set index explicitly to avoid errors ++PendingCheckpoint.SavedSinkStatesCount; - LOG_D("Sink[" << outputIndex << "] state saved"); + LOG_T("Sink[" << outputIndex << "] state saved"); TryToSavePendingCheckpoint(); } @@ -510,7 +509,7 @@ void TDqComputeActorCheckpoints::TryToSavePendingCheckpoint() { saveTaskStateRequest->State.Swap(&PendingCheckpoint.ComputeActorState); Send(CheckpointStorage, std::move(saveTaskStateRequest)); - LOG_CP_I("Task checkpoint is done. Send to storage"); + LOG_CP_D("Task checkpoint is done. Send to storage"); PendingCheckpoint.Clear(); SavingToDatabase = true; } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h index e0e671fdac..61c007708d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h @@ -153,7 +153,7 @@ private: TRetryEventsQueue EventsQueue; // Restore - NYql::NDqProto::NDqStateLoadPlan::TTaskPlan TaskLoadPlan; + NYql::NDqProto::NDqStateLoadPlan::TTaskPlan StateLoadPlan; NDqProto::TCheckpoint RestoringTaskRunnerForCheckpoint; ui64 RestoringTaskRunnerForEvent; |