aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryumkam <yumkam7@ydb.tech>2024-12-04 15:51:40 +0300
committerGitHub <noreply@github.com>2024-12-04 15:51:40 +0300
commit181d1e0d646a852dfd85ec1e6022072c8e940c3c (patch)
tree0d8dd936cf7667a55aead898b07172b3d0233bc4
parent5c989e05ea13bfdbfc51ebc2b2cafc028b660baa (diff)
downloadydb-181d1e0d646a852dfd85ec1e6022072c8e940c3c.tar.gz
Dump more data on async ca monitoring page (#11706)
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp198
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h2
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_output.h2
3 files changed, 122 insertions, 80 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 01be5952071..ed045434a9a 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -173,6 +173,17 @@ private:
html << "<h3>State</h3>";
html << "<pre>" << ComputeActorState.DebugString() << "</pre>";
+#define DUMP(P, X) html << #X ": " << P.X << "<br />"
+ html << "<h4>ProcessSourcesState</h4>";
+ DUMP(ProcessSourcesState, Inflight);
+ html << "<h4>ProcessOutputsState</h4>";
+ DUMP(ProcessOutputsState, Inflight);
+ DUMP(ProcessOutputsState, ChannelsReady);
+ DUMP(ProcessOutputsState, HasDataToSend);
+ DUMP(ProcessOutputsState, AllOutputsFinished);
+ DUMP(ProcessOutputsState, LastRunStatus);
+ DUMP(ProcessOutputsState, LastPopReturnedNoData);
+
html << "<h3>Watermarks</h3>";
for (const auto& [time, id]: WatermarkTakeInputChannelDataRequests) {
html << "WatermarkTakeInputChannelDataRequests: " << time.ToString() << " " << id << "<br />";
@@ -202,22 +213,57 @@ private:
html << "CpuTimeQuotaAsked: " << CpuTimeQuotaAsked.ToString() << "<br />";
html << "UseCpuQuota: " << UseCpuQuota() << "<br />";
+
html << "<h3>Checkpoints</h3>";
html << "ReadyToCheckpoint: " << ReadyToCheckpoint() << "<br />";
html << "CheckpointRequestedFromTaskRunner: " << CheckpointRequestedFromTaskRunner << "<br />";
+ auto dumpAsyncStats = [&](auto prefix, auto& asyncStats) {
+ html << prefix << "Level: " << static_cast<int>(asyncStats.Level) << "<br />";
+ html << prefix << "MinWaitDuration: " << asyncStats.MinWaitDuration.ToString() << "<br />";
+ html << prefix << "CurrentPauseTs: " << (asyncStats.CurrentPauseTs ? asyncStats.CurrentPauseTs->ToString() : TString{}) << "<br />";
+ html << prefix << "MergeWaitPeriod: " << asyncStats.MergeWaitPeriod << "<br />";
+ html << prefix << "Bytes: " << asyncStats.Bytes << "<br />";
+ html << prefix << "DecompressedBytes: " << asyncStats.DecompressedBytes << "<br />";
+ html << prefix << "Rows: " << asyncStats.Rows << "<br />";
+ html << prefix << "Chunks: " << asyncStats.Chunks << "<br />";
+ html << prefix << "Splits: " << asyncStats.Splits << "<br />";
+ html << prefix << "FirstMessageTs: " << asyncStats.FirstMessageTs.ToString() << "<br />";
+ html << prefix << "PauseMessageTs: " << asyncStats.PauseMessageTs.ToString() << "<br />";
+ html << prefix << "ResumeMessageTs: " << asyncStats.ResumeMessageTs.ToString() << "<br />";
+ html << prefix << "LastMessageTs: " << asyncStats.LastMessageTs.ToString() << "<br />";
+ html << prefix << "WaitTime: " << asyncStats.WaitTime.ToString() << "<br />";
+ };
+
+ auto dumpOutputStats = [&](auto prefix, auto& outputStats) {
+ html << prefix << "MaxMemoryUsage: " << outputStats.MaxMemoryUsage << "<br />";
+ html << prefix << "MaxRowsInMemory: " << outputStats.MaxRowsInMemory << "<br />";
+ dumpAsyncStats(prefix, outputStats);
+ };
+
+ auto dumpInputChannelStats = [&](auto prefix, auto& pushStats) {
+ html << prefix << "ChannelId: " << pushStats.ChannelId << "<br />";
+ html << prefix << "SrcStageId: " << pushStats.SrcStageId << "<br />";
+ html << prefix << "RowsInMemory: " << pushStats.RowsInMemory << "<br />";
+ html << prefix << "MaxMemoryUsage: " << pushStats.MaxMemoryUsage << "<br />";
+ html << prefix << "DeserializationTime: " << pushStats.DeserializationTime.ToString() << "<br />";
+ dumpAsyncStats(prefix, pushStats);
+ };
+
+ auto dumpInputStats = dumpAsyncStats;
+
html << "<h3>InputChannels</h3>";
for (const auto& [id, info]: InputChannelsMap) {
html << "<h4>Input Channel Id: " << id << "</h4>";
- html << "LogPrefix: " << info.LogPrefix << "<br />";
- html << "ChannelId: " << info.ChannelId << "<br />";
- html << "SrcStageId: " << info.SrcStageId << "<br />";
- html << "HasPeer: " << info.HasPeer << "<br />";
+ DUMP(info, LogPrefix);
+ DUMP(info, ChannelId);
+ DUMP(info, SrcStageId);
+ DUMP(info, HasPeer);
html << "PendingWatermarks: " << !info.PendingWatermarks.empty() << " " << (info.PendingWatermarks.empty() ? TString{} : info.PendingWatermarks.back().ToString()) << "<br />";
html << "WatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "<br />";
html << "PendingCheckpoint: " << info.PendingCheckpoint.has_value() << " " << (info.PendingCheckpoint ? TStringBuilder{} << info.PendingCheckpoint->GetId() << " " << info.PendingCheckpoint->GetGeneration() : TString{}) << "<br />";
html << "CheckpointingMode: " << NDqProto::ECheckpointingMode_Name(info.CheckpointingMode) << "<br />";
- html << "FreeSpace: " << info.FreeSpace << "<br />";
+ DUMP(info, FreeSpace);
html << "IsPaused: " << info.IsPaused() << "<br />";
if (info.Channel) {
html << "DqInputChannel.ChannelId: " << info.Channel->GetChannelId() << "<br />";
@@ -229,45 +275,18 @@ private:
html << "DqInputChannel.IsFinished: " << info.Channel->IsFinished() << "<br />";
const auto& pushStats = info.Channel->GetPushStats();
- html << "DqInputChannel.PushStats.ChannelId: " << pushStats.ChannelId << "<br />";
- html << "DqInputChannel.PushStats.SrcStageId: " << pushStats.SrcStageId << "<br />";
- html << "DqInputChannel.PushStats.RowsInMemory: " << pushStats.RowsInMemory << "<br />";
- html << "DqInputChannel.PushStats.MaxMemoryUsage: " << pushStats.MaxMemoryUsage << "<br />";
- html << "DqInputChannel.PushStats.DeserializationTime: " << pushStats.DeserializationTime.ToString() << "<br />";
- html << "DqInputChannel.PushStats.Level: " << static_cast<int>(pushStats.Level) << "<br />";
- html << "DqInputChannel.PushStats.MinWaitDuration: " << pushStats.MinWaitDuration.ToString() << "<br />";
- html << "DqInputChannel.PushStats.CurrentPauseTs: " << (pushStats.CurrentPauseTs ? pushStats.CurrentPauseTs->ToString() : TString{}) << "<br />";
- html << "DqInputChannel.PushStats.MergeWaitPeriod: " << pushStats.MergeWaitPeriod << "<br />";
- html << "DqInputChannel.PushStats.Bytes: " << pushStats.Bytes << "<br />";
- html << "DqInputChannel.PushStats.DecompressedBytes: " << pushStats.DecompressedBytes << "<br />";
- html << "DqInputChannel.PushStats.Rows: " << pushStats.Rows << "<br />";
- html << "DqInputChannel.PushStats.Chunks: " << pushStats.Chunks << "<br />";
- html << "DqInputChannel.PushStats.Splits: " << pushStats.Splits << "<br />";
- html << "DqInputChannel.PushStats.FirstMessageTs: " << pushStats.FirstMessageTs.ToString() << "<br />";
- html << "DqInputChannel.PushStats.PauseMessageTs: " << pushStats.PauseMessageTs.ToString() << "<br />";
- html << "DqInputChannel.PushStats.ResumeMessageTs: " << pushStats.ResumeMessageTs.ToString() << "<br />";
- html << "DqInputChannel.PushStats.LastMessageTs: " << pushStats.LastMessageTs.ToString() << "<br />";
- html << "DqInputChannel.PushStats.WaitTime: " << pushStats.WaitTime.ToString() << "<br />";
+ dumpInputChannelStats("DqInputChannel.PushStats.", pushStats);
const auto& popStats = info.Channel->GetPopStats();
- html << "DqInputChannel.PopStats.Bytes: " << popStats.Bytes << "<br />";
- html << "DqInputChannel.PopStats.DecompressedBytes: " << popStats.DecompressedBytes << "<br />";
- html << "DqInputChannel.PopStats.Rows: " << popStats.Rows << "<br />";
- html << "DqInputChannel.PopStats.Chunks: " << popStats.Chunks << "<br />";
- html << "DqInputChannel.PopStats.Splits: " << popStats.Splits << "<br />";
- html << "DqInputChannel.PopStats.FirstMessageTs: " << popStats.FirstMessageTs.ToString() << "<br />";
- html << "DqInputChannel.PopStats.PauseMessageTs: " << popStats.PauseMessageTs.ToString() << "<br />";
- html << "DqInputChannel.PopStats.ResumeMessageTs: " << popStats.ResumeMessageTs.ToString() << "<br />";
- html << "DqInputChannel.PopStats.LastMessageTs: " << popStats.LastMessageTs.ToString() << "<br />";
- html << "DqInputChannel.PopStats.WaitTime: " << popStats.WaitTime.ToString() << "<br />";
+ dumpInputStats("DqInputChannel.PopStats."sv, popStats);
}
}
html << "<h3>InputTransform</h3>";
for (const auto& [id, info]: InputTransformsMap) {
html << "<h4>Input Transform Id: " << id << "</h4>";
- html << "LogPrefix: " << info.LogPrefix << "<br />";
- html << "Type: " << info.Type << "<br />";
+ DUMP(info, LogPrefix);
+ DUMP(info, Type);
html << "PendingWatermark: " << !!info.PendingWatermark << " " << (!info.PendingWatermark ? TString{} : info.PendingWatermark->ToString()) << "<br />";
html << "WatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "<br />";
html << "FreeSpace: " << info.GetFreeSpace() << "<br />";
@@ -279,32 +298,30 @@ private:
html << "DqInputBuffer.InputType: " << (info.Buffer->GetInputType() ? info.Buffer->GetInputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
html << "DqInputBuffer.InputWidth: " << (info.Buffer->GetInputWidth() ? ToString(*info.Buffer->GetInputWidth()) : TString{"unknown"}) << "<br />";
html << "DqInputBuffer.IsFinished: " << info.Buffer->IsFinished() << "<br />";
+ html << "DqInputBuffer.IsPaused: " << info.Buffer->IsPaused() << "<br />";
html << "DqInputBuffer.IsPending: " << info.Buffer->IsPending() << "<br />";
const auto& popStats = info.Buffer->GetPopStats();
- html << "DqInputBuffer.PopStats.Bytes: " << popStats.Bytes << "<br />";
- html << "DqInputBuffer.PopStats.DecompressedBytes: " << popStats.DecompressedBytes << "<br />";
- html << "DqInputBuffer.PopStats.Rows: " << popStats.Rows << "<br />";
- html << "DqInputBuffer.PopStats.Chunks: " << popStats.Chunks << "<br />";
- html << "DqInputBuffer.PopStats.Splits: " << popStats.Splits << "<br />";
- html << "DqInputBuffer.PopStats.FirstMessageTs: " << popStats.FirstMessageTs.ToString() << "<br />";
- html << "DqInputBuffer.PopStats.PauseMessageTs: " << popStats.PauseMessageTs.ToString() << "<br />";
- html << "DqInputBuffer.PopStats.ResumeMessageTs: " << popStats.ResumeMessageTs.ToString() << "<br />";
- html << "DqInputBuffer.PopStats.LastMessageTs: " << popStats.LastMessageTs.ToString() << "<br />";
- html << "DqInputChannel.PopStats.WaitTime: " << popStats.WaitTime.ToString() << "<br />";
+ dumpInputStats("DqInputBuffer."sv, popStats);
+ }
+ if (info.AsyncInput) {
+ const auto& input = *info.AsyncInput;
+ html << "AsyncInput.InputIndex: " << input.GetInputIndex() << "<br />";
+ const auto& ingressStats = input.GetIngressStats();
+ dumpAsyncStats("AsyncInput.IngressStats."sv, ingressStats);
}
}
html << "<h3>OutputChannels</h3>";
for (const auto& [id, info]: OutputChannelsMap) {
html << "<h4>Input Channel Id: " << id << "</h4>";
- html << "ChannelId: " << info.ChannelId << "<br />";
- html << "DstStageId: " << info.DstStageId << "<br />";
- html << "HasPeer: " << info.HasPeer << "<br />";
- html << "Finished: " << info.Finished << "<br />";
- html << "EarlyFinish: " << info.EarlyFinish << "<br />";
- html << "PopStarted: " << info.PopStarted << "<br />";
- html << "IsTransformOutput: " << info.IsTransformOutput << "<br />";
+ DUMP(info, ChannelId);
+ DUMP(info, DstStageId);
+ DUMP(info, HasPeer);
+ DUMP(info, Finished);
+ DUMP(info, EarlyFinish);
+ DUMP(info, PopStarted);
+ DUMP(info, IsTransformOutput);
html << "EWatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "<br />";
if (info.AsyncData) {
@@ -324,21 +341,7 @@ private:
html << "DqInputChannel.OutputType: " << (info.Channel->GetOutputType() ? info.Channel->GetOutputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
const auto& pushStats = info.Channel->GetPushStats();
- html << "DqOutputChannel.PushStats.MaxRowsInMemory: " << pushStats.MaxRowsInMemory << "<br />";
- html << "DqOutputChannel.PushStats.MaxMemoryUsage: " << pushStats.MaxMemoryUsage << "<br />";
- html << "DqOutputChannel.PushStats.Level: " << static_cast<int>(pushStats.Level) << "<br />";
- html << "DqOutputChannel.PushStats.MinWaitDuration: " << pushStats.MinWaitDuration.ToString() << "<br />";
- html << "DqOutputChannel.PushStats.CurrentPauseTs: " << (pushStats.CurrentPauseTs ? pushStats.CurrentPauseTs->ToString() : TString{}) << "<br />";
- html << "DqOutputChannel.PushStats.MergeWaitPeriod: " << pushStats.MergeWaitPeriod << "<br />";
- html << "DqOutputChannel.PushStats.Bytes: " << pushStats.Bytes << "<br />";
- html << "DqOutputChannel.PushStats.Rows: " << pushStats.Rows << "<br />";
- html << "DqOutputChannel.PushStats.Chunks: " << pushStats.Chunks << "<br />";
- html << "DqOutputChannel.PushStats.Splits: " << pushStats.Splits << "<br />";
- html << "DqOutputChannel.PushStats.FirstMessageTs: " << pushStats.FirstMessageTs.ToString() << "<br />";
- html << "DqOutputChannel.PushStats.PauseMessageTs: " << pushStats.PauseMessageTs.ToString() << "<br />";
- html << "DqOutputChannel.PushStats.ResumeMessageTs: " << pushStats.ResumeMessageTs.ToString() << "<br />";
- html << "DqOutputChannel.PushStats.LastMessageTs: " << pushStats.LastMessageTs.ToString() << "<br />";
- html << "DqOutputChannel.PushStats.WaitTime: " << pushStats.WaitTime.ToString() << "<br />";
+ dumpOutputStats("DqOutputChannel.PushStats."sv, pushStats);
const auto& popStats = info.Channel->GetPopStats();
html << "DqOutputChannel.PopStats.ChannelId: " << popStats.ChannelId << "<br />";
@@ -349,17 +352,58 @@ private:
html << "DqOutputChannel.PopStats.SpilledBytes: " << popStats.SpilledBytes << "<br />";
html << "DqOutputChannel.PopStats.SpilledRows: " << popStats.SpilledRows << "<br />";
html << "DqOutputChannel.PopStats.SpilledBlobs: " << popStats.SpilledBlobs << "<br />";
- html << "DqOutputChannel.PopStats.Bytes: " << popStats.Bytes << "<br />";
- html << "DqOutputChannel.PopStats.Rows: " << popStats.Rows << "<br />";
- html << "DqOutputChannel.PopStats.Chunks: " << popStats.Chunks << "<br />";
- html << "DqOutputChannel.PopStats.Splits: " << popStats.Splits << "<br />";
- html << "DqOutputChannel.PopStats.FirstMessageTs: " << popStats.FirstMessageTs.ToString() << "<br />";
- html << "DqOutputChannel.PopStats.PauseMessageTs: " << popStats.PauseMessageTs.ToString() << "<br />";
- html << "DqOutputChannel.PopStats.ResumeMessageTs: " << popStats.ResumeMessageTs.ToString() << "<br />";
- html << "DqOutputChannel.PopStats.LastMessageTs: " << popStats.LastMessageTs.ToString() << "<br />";
- html << "DqOutputChannel.PopStats.WaitTime: " << popStats.WaitTime.ToString() << "<br />";
+ dumpOutputStats("DqOutputChannel.PopStats."sv, popStats);
+ }
+ }
+
+ html << "<h3>Sinks</h3>";
+ for (const auto& [id, info]: SinksMap) {
+ html << "<h4>Sink Id: " << id << "</h4>";
+ DUMP(info, Type);
+ DUMP(info, FreeSpaceBeforeSend);
+ DUMP(info, Finished);
+ DUMP(info, FinishIsAcknowledged);
+ DUMP(info, PopStarted);
+ if (info.Buffer) {
+ const auto& buffer = *info.Buffer;
+ html << "DqOutputBuffer.OutputIndex: " << buffer.GetOutputIndex() << "<br />";
+ html << "DqOutputBuffer.IsFull: " << buffer.IsFull() << "<br />";
+ html << "DqOutputBuffer.OutputType: " << (buffer.GetOutputType() ? buffer.GetOutputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
+ html << "DqOutputBuffer.IsFinished: " << buffer.IsFinished() << "<br />";
+ html << "DqOutputBuffer.HasData: " << buffer.HasData() << "<br />";
+
+ const auto& pushStats = buffer.GetPushStats();
+ dumpOutputStats("DqOutputBuffer.PushStats."sv, pushStats);
+
+ const auto& popStats = buffer.GetPopStats();
+ dumpOutputStats("DqOutputBuffer.PopStats."sv, popStats);
+ }
+ if (info.AsyncOutput) {
+ const auto& output = *info.AsyncOutput;
+ html << "AsyncOutput.OutputIndex: " << output.GetOutputIndex() << "<br />";
+ html << "AsyncOutput.FreeSpace: " << output.GetFreeSpace() << "<br />";
+ const auto& egressStats = output.GetEgressStats();
+ dumpAsyncStats("AsyncOutput.EgressStats."sv, egressStats);
+ }
+ }
+
+ html << "<h3>Sources</h3>";
+ for (const auto& [id, info]: SourcesMap) {
+ html << "<h4>Source Id: " << id << "</h4>";
+ DUMP(info, Type);
+ DUMP(info, LogPrefix);
+ DUMP(info, Index);
+ DUMP(info, Finished);
+ html << "IsPausedByWatermark: " << info.IsPausedByWatermark() << "<br />";
+ html << "FreeSpace: " << info.GetFreeSpace() << "<br />";
+ if (info.AsyncInput) {
+ const auto& input = *info.AsyncInput;
+ html << "AsyncInput.InputIndex: " << input.GetInputIndex() << "<br />";
+ const auto& ingressStats = input.GetIngressStats();
+ dumpAsyncStats("AsyncInput.IngressStats."sv, ingressStats);
}
}
+#undef DUMP
Send(ev->Sender, new NActors::NMon::TEvHttpInfoRes(html.Str()));
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h
index 0e7ad1bff7c..22aa7e6f586 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h
@@ -35,7 +35,7 @@ public:
, IssuesBuffer(IssuesBufferSize)
, WatermarksMode(watermarksMode) {}
- bool IsPausedByWatermark() {
+ bool IsPausedByWatermark() const {
return PendingWatermark.Defined();
}
diff --git a/ydb/library/yql/dq/runtime/dq_async_output.h b/ydb/library/yql/dq/runtime/dq_async_output.h
index d399080dfa4..736b2d7b2c3 100644
--- a/ydb/library/yql/dq/runtime/dq_async_output.h
+++ b/ydb/library/yql/dq/runtime/dq_async_output.h
@@ -11,8 +11,6 @@ namespace NYql::NDq {
struct TDqAsyncOutputBufferStats : TDqOutputStats {
ui64 OutputIndex = 0;
TString Type;
- ui64 MaxMemoryUsage = 0;
- ui64 MaxRowsInMemory = 0;
};
class IDqAsyncOutputBuffer : public IDqOutput {