diff options
author | hor911 <hor911@ydb.tech> | 2022-07-18 17:18:51 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-07-18 17:18:51 +0300 |
commit | 09b4fccb98c8d8a57604f9e747b466219cc5fccd (patch) | |
tree | 2cbec00b3edaf4a192b56c7a6298cd4a0a55e6fc | |
parent | 526b2789bffbc9b3e315fcd39e6b94026efb8a71 (diff) | |
download | ydb-09b4fccb98c8d8a57604f9e747b466219cc5fccd.tar.gz |
Better better logging
-rw-r--r-- | contrib/libs/fmt/test/gtest-extra.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp | 59 |
2 files changed, 37 insertions, 25 deletions
diff --git a/contrib/libs/fmt/test/gtest-extra.h b/contrib/libs/fmt/test/gtest-extra.h index 36be158bbb7..010bde066b1 100644 --- a/contrib/libs/fmt/test/gtest-extra.h +++ b/contrib/libs/fmt/test/gtest-extra.h @@ -67,7 +67,8 @@ class OutputRedirect { fmt::file original_; // Original file passed to redirector. fmt::file read_end_; // Read end of the pipe where the output is redirected. - GTEST_DISALLOW_COPY_AND_ASSIGN_(OutputRedirect); + OutputRedirect(const OutputRedirect&) = delete; + OutputRedirect& operator= (const OutputRedirect&) = delete; void flush(); void restore(); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp index 2cf4e3cf227..cf78bc01277 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp @@ -18,16 +18,27 @@ #define LOG_E(s) \ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s) -#define LOG_CP_T(s) \ - LOG_T("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s) -#define LOG_CP_D(s) \ - LOG_D("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s) -#define LOG_CP_I(s) \ - LOG_I("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s) -#define LOG_CP_W(s) \ - LOG_W("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s) -#define LOG_CP_E(s) \ - LOG_E("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s) +#define LOG_CP_T(сheckpoint, s) \ + LOG_T("[Checkpoint " << MakeStringForLog(сheckpoint) << "] " << s) +#define LOG_CP_D(сheckpoint, s) \ + LOG_D("[Checkpoint " << MakeStringForLog(сheckpoint) << "] " << s) +#define LOG_CP_I(сheckpoint, s) \ + LOG_I("[Checkpoint " << MakeStringForLog(сheckpoint) << "] " << s) +#define LOG_CP_W(сheckpoint, s) \ + LOG_W("[Checkpoint " << MakeStringForLog(сheckpoint) << "] " << s) +#define LOG_CP_E(сheckpoint, s) \ + LOG_E("[Checkpoint " << MakeStringForLog(сheckpoint) << "] " << s) + +#define LOG_PCP_T(s) \ + LOG_CP_T(*PendingCheckpoint.Checkpoint, s) +#define LOG_PCP_D(s) \ + LOG_CP_D(*PendingCheckpoint.Checkpoint, s) +#define LOG_PCP_I(s) \ + LOG_CP_I(*PendingCheckpoint.Checkpoint, s) +#define LOG_PCP_W(s) \ + LOG_CP_W(*PendingCheckpoint.Checkpoint, s) +#define LOG_PCP_E(s) \ + LOG_CP_E(*PendingCheckpoint.Checkpoint, s) namespace NYql::NDq { @@ -227,7 +238,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvInjectCheckpoint::TPtr& YQL_ENSURE(IngressTask, "Shouldn't inject barriers into non-ingress tasks"); YQL_ENSURE(!PendingCheckpoint); - LOG_CP_D("TEvInjectCheckpoint"); + LOG_PCP_D("TEvInjectCheckpoint"); StartCheckpoint(ev->Get()->Record.GetCheckpoint()); ComputeActor->ResumeExecution(); } @@ -254,7 +265,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint:: ComputeActor->Stop(); StateLoadPlan = ev->Get()->Record.GetStateLoadPlan(); const auto& checkpoint = ev->Get()->Record.GetCheckpoint(); - LOG_CP_D("TEvRestoreFromCheckpoint, StateLoadPlan = " << StateLoadPlan); + LOG_CP_D(checkpoint, "TEvRestoreFromCheckpoint, StateLoadPlan = " << StateLoadPlan); switch (StateLoadPlan.GetStateType()) { case NDqProto::NDqStateLoadPlan::STATE_TYPE_EMPTY: { @@ -285,7 +296,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint:: } default: { - LOG_CP_E("Unsupported state type: " + LOG_CP_E(checkpoint, "Unsupported state type: " << NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast<int>(StateLoadPlan.GetStateType()) << ")"); EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR)); break; @@ -307,18 +318,18 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt } if (!ev->Get()->Issues.Empty()) { - LOG_CP_E("TEvGetTaskStateResult error: " << ev->Get()->Issues.ToOneLineString()); + LOG_CP_E(checkpoint, "TEvGetTaskStateResult error: " << ev->Get()->Issues.ToOneLineString()); EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie); return; } if (ev->Get()->States.size() != taskIdsSize) { - LOG_CP_E("TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize); + LOG_CP_E(checkpoint, "TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize); EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie); return; } - LOG_CP_D("TEvGetTaskStateResult: restoring state"); + LOG_CP_D(checkpoint, "TEvGetTaskStateResult: restoring state"); RestoringTaskRunnerForCheckpoint = checkpoint; RestoringTaskRunnerForEvent = ev->Cookie; if (StateLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN) { @@ -336,12 +347,12 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt void TDqComputeActorCheckpoints::AfterStateLoading(const TMaybe<TString>& error) { auto& checkpoint = RestoringTaskRunnerForCheckpoint; if (error.Defined()) { - LOG_CP_E("Failed to load state: " << error << "ABORTED"); + LOG_CP_E(checkpoint, "Failed to load state: " << error << "ABORTED"); EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR), RestoringTaskRunnerForEvent); return; } EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK), RestoringTaskRunnerForEvent); - LOG_CP_D("Checkpoint state restored"); + LOG_CP_D(checkpoint, "Checkpoint state restored"); } void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRun::TPtr& ev) { @@ -419,9 +430,9 @@ void TDqComputeActorCheckpoints::DoCheckpoint() { Y_VERIFY(CheckpointCoordinator); Y_VERIFY(PendingCheckpoint); - LOG_CP_D("Performing task checkpoint"); + LOG_PCP_D("Performing task checkpoint"); if (SaveState()) { - LOG_CP_T("Injecting checkpoint barrier to outputs"); + LOG_PCP_T("Injecting checkpoint barrier to outputs"); ComputeActor->InjectBarrierToOutputs(*PendingCheckpoint.Checkpoint); TryToSavePendingCheckpoint(); } @@ -435,7 +446,7 @@ bool TDqComputeActorCheckpoints::SaveState() { ComputeActor->SaveState(*PendingCheckpoint.Checkpoint, PendingCheckpoint.ComputeActorState); } catch (const std::exception& e) { AbortCheckpoint(); - LOG_CP_E("Failed to save state: " << e.what()); + LOG_PCP_E("Failed to save state: " << e.what()); auto resultEv = MakeHolder<TEvDqCompute::TEvSaveTaskStateResult>(); resultEv->Record.MutableCheckpoint()->CopyFrom(*PendingCheckpoint.Checkpoint); @@ -446,7 +457,7 @@ bool TDqComputeActorCheckpoints::SaveState() { return false; } - LOG_CP_T("CA state saved"); + LOG_PCP_T("CA state saved"); return true; } @@ -457,7 +468,7 @@ void TDqComputeActorCheckpoints::RegisterCheckpoint(const NDqProto::TCheckpoint& YQL_ENSURE(PendingCheckpoint.Checkpoint->GetGeneration() == checkpoint.GetGeneration()); YQL_ENSURE(PendingCheckpoint.Checkpoint->GetId() == checkpoint.GetId()); } - LOG_CP_D("Got checkpoint barrier from channel " << channelId); + LOG_PCP_D("Got checkpoint barrier from channel " << channelId); ComputeActor->ResumeExecution(); } @@ -509,7 +520,7 @@ void TDqComputeActorCheckpoints::TryToSavePendingCheckpoint() { saveTaskStateRequest->State.Swap(&PendingCheckpoint.ComputeActorState); Send(CheckpointStorage, std::move(saveTaskStateRequest)); - LOG_CP_D("Task checkpoint is done. Send to storage"); + LOG_PCP_D("Task checkpoint is done. Send to storage"); PendingCheckpoint.Clear(); SavingToDatabase = true; } |