aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-07-18 17:18:51 +0300
committerhor911 <hor911@ydb.tech>2022-07-18 17:18:51 +0300
commit09b4fccb98c8d8a57604f9e747b466219cc5fccd (patch)
tree2cbec00b3edaf4a192b56c7a6298cd4a0a55e6fc
parent526b2789bffbc9b3e315fcd39e6b94026efb8a71 (diff)
downloadydb-09b4fccb98c8d8a57604f9e747b466219cc5fccd.tar.gz
Better better logging
-rw-r--r--contrib/libs/fmt/test/gtest-extra.h3
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp59
2 files changed, 37 insertions, 25 deletions
diff --git a/contrib/libs/fmt/test/gtest-extra.h b/contrib/libs/fmt/test/gtest-extra.h
index 36be158bbb7..010bde066b1 100644
--- a/contrib/libs/fmt/test/gtest-extra.h
+++ b/contrib/libs/fmt/test/gtest-extra.h
@@ -67,7 +67,8 @@ class OutputRedirect {
fmt::file original_; // Original file passed to redirector.
fmt::file read_end_; // Read end of the pipe where the output is redirected.
- GTEST_DISALLOW_COPY_AND_ASSIGN_(OutputRedirect);
+ OutputRedirect(const OutputRedirect&) = delete;
+ OutputRedirect& operator= (const OutputRedirect&) = delete;
void flush();
void restore();
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
index 2cf4e3cf227..cf78bc01277 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
@@ -18,16 +18,27 @@
#define LOG_E(s) \
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "[" << GraphId << "] Task: " << Task.GetId() << ". " << s)
-#define LOG_CP_T(s) \
- LOG_T("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s)
-#define LOG_CP_D(s) \
- LOG_D("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s)
-#define LOG_CP_I(s) \
- LOG_I("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s)
-#define LOG_CP_W(s) \
- LOG_W("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s)
-#define LOG_CP_E(s) \
- LOG_E("[Checkpoint " << MakeStringForLog(*PendingCheckpoint.Checkpoint) << "] " << s)
+#define LOG_CP_T(сheckpoint, s) \
+ LOG_T("[Checkpoint " << MakeStringForLog(сheckpoint) << "] " << s)
+#define LOG_CP_D(сheckpoint, s) \
+ LOG_D("[Checkpoint " << MakeStringForLog(сheckpoint) << "] " << s)
+#define LOG_CP_I(сheckpoint, s) \
+ LOG_I("[Checkpoint " << MakeStringForLog(сheckpoint) << "] " << s)
+#define LOG_CP_W(сheckpoint, s) \
+ LOG_W("[Checkpoint " << MakeStringForLog(сheckpoint) << "] " << s)
+#define LOG_CP_E(сheckpoint, s) \
+ LOG_E("[Checkpoint " << MakeStringForLog(сheckpoint) << "] " << s)
+
+#define LOG_PCP_T(s) \
+ LOG_CP_T(*PendingCheckpoint.Checkpoint, s)
+#define LOG_PCP_D(s) \
+ LOG_CP_D(*PendingCheckpoint.Checkpoint, s)
+#define LOG_PCP_I(s) \
+ LOG_CP_I(*PendingCheckpoint.Checkpoint, s)
+#define LOG_PCP_W(s) \
+ LOG_CP_W(*PendingCheckpoint.Checkpoint, s)
+#define LOG_PCP_E(s) \
+ LOG_CP_E(*PendingCheckpoint.Checkpoint, s)
namespace NYql::NDq {
@@ -227,7 +238,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvInjectCheckpoint::TPtr&
YQL_ENSURE(IngressTask, "Shouldn't inject barriers into non-ingress tasks");
YQL_ENSURE(!PendingCheckpoint);
- LOG_CP_D("TEvInjectCheckpoint");
+ LOG_PCP_D("TEvInjectCheckpoint");
StartCheckpoint(ev->Get()->Record.GetCheckpoint());
ComputeActor->ResumeExecution();
}
@@ -254,7 +265,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::
ComputeActor->Stop();
StateLoadPlan = ev->Get()->Record.GetStateLoadPlan();
const auto& checkpoint = ev->Get()->Record.GetCheckpoint();
- LOG_CP_D("TEvRestoreFromCheckpoint, StateLoadPlan = " << StateLoadPlan);
+ LOG_CP_D(checkpoint, "TEvRestoreFromCheckpoint, StateLoadPlan = " << StateLoadPlan);
switch (StateLoadPlan.GetStateType()) {
case NDqProto::NDqStateLoadPlan::STATE_TYPE_EMPTY:
{
@@ -285,7 +296,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::
}
default:
{
- LOG_CP_E("Unsupported state type: "
+ LOG_CP_E(checkpoint, "Unsupported state type: "
<< NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast<int>(StateLoadPlan.GetStateType()) << ")");
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR));
break;
@@ -307,18 +318,18 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt
}
if (!ev->Get()->Issues.Empty()) {
- LOG_CP_E("TEvGetTaskStateResult error: " << ev->Get()->Issues.ToOneLineString());
+ LOG_CP_E(checkpoint, "TEvGetTaskStateResult error: " << ev->Get()->Issues.ToOneLineString());
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);
return;
}
if (ev->Get()->States.size() != taskIdsSize) {
- LOG_CP_E("TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize);
+ LOG_CP_E(checkpoint, "TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);
return;
}
- LOG_CP_D("TEvGetTaskStateResult: restoring state");
+ LOG_CP_D(checkpoint, "TEvGetTaskStateResult: restoring state");
RestoringTaskRunnerForCheckpoint = checkpoint;
RestoringTaskRunnerForEvent = ev->Cookie;
if (StateLoadPlan.GetStateType() == NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN) {
@@ -336,12 +347,12 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt
void TDqComputeActorCheckpoints::AfterStateLoading(const TMaybe<TString>& error) {
auto& checkpoint = RestoringTaskRunnerForCheckpoint;
if (error.Defined()) {
- LOG_CP_E("Failed to load state: " << error << "ABORTED");
+ LOG_CP_E(checkpoint, "Failed to load state: " << error << "ABORTED");
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR), RestoringTaskRunnerForEvent);
return;
}
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK), RestoringTaskRunnerForEvent);
- LOG_CP_D("Checkpoint state restored");
+ LOG_CP_D(checkpoint, "Checkpoint state restored");
}
void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRun::TPtr& ev) {
@@ -419,9 +430,9 @@ void TDqComputeActorCheckpoints::DoCheckpoint() {
Y_VERIFY(CheckpointCoordinator);
Y_VERIFY(PendingCheckpoint);
- LOG_CP_D("Performing task checkpoint");
+ LOG_PCP_D("Performing task checkpoint");
if (SaveState()) {
- LOG_CP_T("Injecting checkpoint barrier to outputs");
+ LOG_PCP_T("Injecting checkpoint barrier to outputs");
ComputeActor->InjectBarrierToOutputs(*PendingCheckpoint.Checkpoint);
TryToSavePendingCheckpoint();
}
@@ -435,7 +446,7 @@ bool TDqComputeActorCheckpoints::SaveState() {
ComputeActor->SaveState(*PendingCheckpoint.Checkpoint, PendingCheckpoint.ComputeActorState);
} catch (const std::exception& e) {
AbortCheckpoint();
- LOG_CP_E("Failed to save state: " << e.what());
+ LOG_PCP_E("Failed to save state: " << e.what());
auto resultEv = MakeHolder<TEvDqCompute::TEvSaveTaskStateResult>();
resultEv->Record.MutableCheckpoint()->CopyFrom(*PendingCheckpoint.Checkpoint);
@@ -446,7 +457,7 @@ bool TDqComputeActorCheckpoints::SaveState() {
return false;
}
- LOG_CP_T("CA state saved");
+ LOG_PCP_T("CA state saved");
return true;
}
@@ -457,7 +468,7 @@ void TDqComputeActorCheckpoints::RegisterCheckpoint(const NDqProto::TCheckpoint&
YQL_ENSURE(PendingCheckpoint.Checkpoint->GetGeneration() == checkpoint.GetGeneration());
YQL_ENSURE(PendingCheckpoint.Checkpoint->GetId() == checkpoint.GetId());
}
- LOG_CP_D("Got checkpoint barrier from channel " << channelId);
+ LOG_PCP_D("Got checkpoint barrier from channel " << channelId);
ComputeActor->ResumeExecution();
}
@@ -509,7 +520,7 @@ void TDqComputeActorCheckpoints::TryToSavePendingCheckpoint() {
saveTaskStateRequest->State.Swap(&PendingCheckpoint.ComputeActorState);
Send(CheckpointStorage, std::move(saveTaskStateRequest));
- LOG_CP_D("Task checkpoint is done. Send to storage");
+ LOG_PCP_D("Task checkpoint is done. Send to storage");
PendingCheckpoint.Clear();
SavingToDatabase = true;
}