aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-05-16 22:57:51 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-05-16 22:57:51 +0300
commit45e5e9ae890daf63ba9a7a9550c1d64ac7e4734d (patch)
tree0626453c514d3a126ba8f147742e976d5031fc1f
parenta80aeb83feed17a178ad17a0a60fc49ef05c16f3 (diff)
downloadydb-45e5e9ae890daf63ba9a7a9550c1d64ac7e4734d.tar.gz
YQ-1054 Additional logging for saving solomon sink state
Additional logging for saving solomon sink state ref:a50aa0a1c018926ac57375b6e5756aca2be21191
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp1
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp5
2 files changed, 5 insertions, 1 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
index 218411069a..d80a4f9700 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
@@ -456,6 +456,7 @@ void TDqComputeActorCheckpoints::OnSinkStateSaved(NDqProto::TSinkState&& state,
*sinkState = std::move(state);
sinkState->SetOutputIndex(outputIndex); // Set index explicitly to avoid errors
++PendingCheckpoint.SavedSinkStatesCount;
+ LOG_D("Sink[" << outputIndex << "] state saved");
TryToSavePendingCheckpoint();
}
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
index 62a849057a..1475bedb58 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
@@ -142,7 +142,9 @@ public:
const TMaybe<NDqProto::TCheckpoint>& checkpoint,
bool) override
{
- SINK_LOG_D("Got " << batch.size() << " items to send");
+ SINK_LOG_D("Got " << batch.size() << " items to send. Checkpoint: " << checkpoint.Defined()
+ << ". Send queue: " << SendingBuffer.size() << ". Inflight: " << InflightBuffer.size()
+ << ". Checkpoint in progress: " << CheckpointInProgress.has_value());
ui64 metricsCount = 0;
for (const auto& item : batch) {
@@ -365,6 +367,7 @@ private:
}
if (std::holds_alternative<NDqProto::TCheckpoint>(variant)) {
+ SINK_LOG_D("Process checkpoint. Inflight before checkpoint: " << InflightBuffer.size());
CheckpointInProgress = std::get<NDqProto::TCheckpoint>(std::move(variant));
if (InflightBuffer.empty()) {
DoCheckpoint();