diff options
author | yumkam <yumkam7@ydb.tech> | 2024-12-04 15:51:40 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-04 15:51:40 +0300 |
commit | 181d1e0d646a852dfd85ec1e6022072c8e940c3c (patch) | |
tree | 0d8dd936cf7667a55aead898b07172b3d0233bc4 | |
parent | 5c989e05ea13bfdbfc51ebc2b2cafc028b660baa (diff) | |
download | ydb-181d1e0d646a852dfd85ec1e6022072c8e940c3c.tar.gz |
Dump more data on async ca monitoring page (#11706)
3 files changed, 122 insertions, 80 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 01be5952071..ed045434a9a 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -173,6 +173,17 @@ private: html << "<h3>State</h3>"; html << "<pre>" << ComputeActorState.DebugString() << "</pre>"; +#define DUMP(P, X) html << #X ": " << P.X << "<br />" + html << "<h4>ProcessSourcesState</h4>"; + DUMP(ProcessSourcesState, Inflight); + html << "<h4>ProcessOutputsState</h4>"; + DUMP(ProcessOutputsState, Inflight); + DUMP(ProcessOutputsState, ChannelsReady); + DUMP(ProcessOutputsState, HasDataToSend); + DUMP(ProcessOutputsState, AllOutputsFinished); + DUMP(ProcessOutputsState, LastRunStatus); + DUMP(ProcessOutputsState, LastPopReturnedNoData); + html << "<h3>Watermarks</h3>"; for (const auto& [time, id]: WatermarkTakeInputChannelDataRequests) { html << "WatermarkTakeInputChannelDataRequests: " << time.ToString() << " " << id << "<br />"; @@ -202,22 +213,57 @@ private: html << "CpuTimeQuotaAsked: " << CpuTimeQuotaAsked.ToString() << "<br />"; html << "UseCpuQuota: " << UseCpuQuota() << "<br />"; + html << "<h3>Checkpoints</h3>"; html << "ReadyToCheckpoint: " << ReadyToCheckpoint() << "<br />"; html << "CheckpointRequestedFromTaskRunner: " << CheckpointRequestedFromTaskRunner << "<br />"; + auto dumpAsyncStats = [&](auto prefix, auto& asyncStats) { + html << prefix << "Level: " << static_cast<int>(asyncStats.Level) << "<br />"; + html << prefix << "MinWaitDuration: " << asyncStats.MinWaitDuration.ToString() << "<br />"; + html << prefix << "CurrentPauseTs: " << (asyncStats.CurrentPauseTs ? asyncStats.CurrentPauseTs->ToString() : TString{}) << "<br />"; + html << prefix << "MergeWaitPeriod: " << asyncStats.MergeWaitPeriod << "<br />"; + html << prefix << "Bytes: " << asyncStats.Bytes << "<br />"; + html << prefix << "DecompressedBytes: " << asyncStats.DecompressedBytes << "<br />"; + html << prefix << "Rows: " << asyncStats.Rows << "<br />"; + html << prefix << "Chunks: " << asyncStats.Chunks << "<br />"; + html << prefix << "Splits: " << asyncStats.Splits << "<br />"; + html << prefix << "FirstMessageTs: " << asyncStats.FirstMessageTs.ToString() << "<br />"; + html << prefix << "PauseMessageTs: " << asyncStats.PauseMessageTs.ToString() << "<br />"; + html << prefix << "ResumeMessageTs: " << asyncStats.ResumeMessageTs.ToString() << "<br />"; + html << prefix << "LastMessageTs: " << asyncStats.LastMessageTs.ToString() << "<br />"; + html << prefix << "WaitTime: " << asyncStats.WaitTime.ToString() << "<br />"; + }; + + auto dumpOutputStats = [&](auto prefix, auto& outputStats) { + html << prefix << "MaxMemoryUsage: " << outputStats.MaxMemoryUsage << "<br />"; + html << prefix << "MaxRowsInMemory: " << outputStats.MaxRowsInMemory << "<br />"; + dumpAsyncStats(prefix, outputStats); + }; + + auto dumpInputChannelStats = [&](auto prefix, auto& pushStats) { + html << prefix << "ChannelId: " << pushStats.ChannelId << "<br />"; + html << prefix << "SrcStageId: " << pushStats.SrcStageId << "<br />"; + html << prefix << "RowsInMemory: " << pushStats.RowsInMemory << "<br />"; + html << prefix << "MaxMemoryUsage: " << pushStats.MaxMemoryUsage << "<br />"; + html << prefix << "DeserializationTime: " << pushStats.DeserializationTime.ToString() << "<br />"; + dumpAsyncStats(prefix, pushStats); + }; + + auto dumpInputStats = dumpAsyncStats; + html << "<h3>InputChannels</h3>"; for (const auto& [id, info]: InputChannelsMap) { html << "<h4>Input Channel Id: " << id << "</h4>"; - html << "LogPrefix: " << info.LogPrefix << "<br />"; - html << "ChannelId: " << info.ChannelId << "<br />"; - html << "SrcStageId: " << info.SrcStageId << "<br />"; - html << "HasPeer: " << info.HasPeer << "<br />"; + DUMP(info, LogPrefix); + DUMP(info, ChannelId); + DUMP(info, SrcStageId); + DUMP(info, HasPeer); html << "PendingWatermarks: " << !info.PendingWatermarks.empty() << " " << (info.PendingWatermarks.empty() ? TString{} : info.PendingWatermarks.back().ToString()) << "<br />"; html << "WatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "<br />"; html << "PendingCheckpoint: " << info.PendingCheckpoint.has_value() << " " << (info.PendingCheckpoint ? TStringBuilder{} << info.PendingCheckpoint->GetId() << " " << info.PendingCheckpoint->GetGeneration() : TString{}) << "<br />"; html << "CheckpointingMode: " << NDqProto::ECheckpointingMode_Name(info.CheckpointingMode) << "<br />"; - html << "FreeSpace: " << info.FreeSpace << "<br />"; + DUMP(info, FreeSpace); html << "IsPaused: " << info.IsPaused() << "<br />"; if (info.Channel) { html << "DqInputChannel.ChannelId: " << info.Channel->GetChannelId() << "<br />"; @@ -229,45 +275,18 @@ private: html << "DqInputChannel.IsFinished: " << info.Channel->IsFinished() << "<br />"; const auto& pushStats = info.Channel->GetPushStats(); - html << "DqInputChannel.PushStats.ChannelId: " << pushStats.ChannelId << "<br />"; - html << "DqInputChannel.PushStats.SrcStageId: " << pushStats.SrcStageId << "<br />"; - html << "DqInputChannel.PushStats.RowsInMemory: " << pushStats.RowsInMemory << "<br />"; - html << "DqInputChannel.PushStats.MaxMemoryUsage: " << pushStats.MaxMemoryUsage << "<br />"; - html << "DqInputChannel.PushStats.DeserializationTime: " << pushStats.DeserializationTime.ToString() << "<br />"; - html << "DqInputChannel.PushStats.Level: " << static_cast<int>(pushStats.Level) << "<br />"; - html << "DqInputChannel.PushStats.MinWaitDuration: " << pushStats.MinWaitDuration.ToString() << "<br />"; - html << "DqInputChannel.PushStats.CurrentPauseTs: " << (pushStats.CurrentPauseTs ? pushStats.CurrentPauseTs->ToString() : TString{}) << "<br />"; - html << "DqInputChannel.PushStats.MergeWaitPeriod: " << pushStats.MergeWaitPeriod << "<br />"; - html << "DqInputChannel.PushStats.Bytes: " << pushStats.Bytes << "<br />"; - html << "DqInputChannel.PushStats.DecompressedBytes: " << pushStats.DecompressedBytes << "<br />"; - html << "DqInputChannel.PushStats.Rows: " << pushStats.Rows << "<br />"; - html << "DqInputChannel.PushStats.Chunks: " << pushStats.Chunks << "<br />"; - html << "DqInputChannel.PushStats.Splits: " << pushStats.Splits << "<br />"; - html << "DqInputChannel.PushStats.FirstMessageTs: " << pushStats.FirstMessageTs.ToString() << "<br />"; - html << "DqInputChannel.PushStats.PauseMessageTs: " << pushStats.PauseMessageTs.ToString() << "<br />"; - html << "DqInputChannel.PushStats.ResumeMessageTs: " << pushStats.ResumeMessageTs.ToString() << "<br />"; - html << "DqInputChannel.PushStats.LastMessageTs: " << pushStats.LastMessageTs.ToString() << "<br />"; - html << "DqInputChannel.PushStats.WaitTime: " << pushStats.WaitTime.ToString() << "<br />"; + dumpInputChannelStats("DqInputChannel.PushStats.", pushStats); const auto& popStats = info.Channel->GetPopStats(); - html << "DqInputChannel.PopStats.Bytes: " << popStats.Bytes << "<br />"; - html << "DqInputChannel.PopStats.DecompressedBytes: " << popStats.DecompressedBytes << "<br />"; - html << "DqInputChannel.PopStats.Rows: " << popStats.Rows << "<br />"; - html << "DqInputChannel.PopStats.Chunks: " << popStats.Chunks << "<br />"; - html << "DqInputChannel.PopStats.Splits: " << popStats.Splits << "<br />"; - html << "DqInputChannel.PopStats.FirstMessageTs: " << popStats.FirstMessageTs.ToString() << "<br />"; - html << "DqInputChannel.PopStats.PauseMessageTs: " << popStats.PauseMessageTs.ToString() << "<br />"; - html << "DqInputChannel.PopStats.ResumeMessageTs: " << popStats.ResumeMessageTs.ToString() << "<br />"; - html << "DqInputChannel.PopStats.LastMessageTs: " << popStats.LastMessageTs.ToString() << "<br />"; - html << "DqInputChannel.PopStats.WaitTime: " << popStats.WaitTime.ToString() << "<br />"; + dumpInputStats("DqInputChannel.PopStats."sv, popStats); } } html << "<h3>InputTransform</h3>"; for (const auto& [id, info]: InputTransformsMap) { html << "<h4>Input Transform Id: " << id << "</h4>"; - html << "LogPrefix: " << info.LogPrefix << "<br />"; - html << "Type: " << info.Type << "<br />"; + DUMP(info, LogPrefix); + DUMP(info, Type); html << "PendingWatermark: " << !!info.PendingWatermark << " " << (!info.PendingWatermark ? TString{} : info.PendingWatermark->ToString()) << "<br />"; html << "WatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "<br />"; html << "FreeSpace: " << info.GetFreeSpace() << "<br />"; @@ -279,32 +298,30 @@ private: html << "DqInputBuffer.InputType: " << (info.Buffer->GetInputType() ? info.Buffer->GetInputType()->GetKindAsStr() : TString{"unknown"}) << "<br />"; html << "DqInputBuffer.InputWidth: " << (info.Buffer->GetInputWidth() ? ToString(*info.Buffer->GetInputWidth()) : TString{"unknown"}) << "<br />"; html << "DqInputBuffer.IsFinished: " << info.Buffer->IsFinished() << "<br />"; + html << "DqInputBuffer.IsPaused: " << info.Buffer->IsPaused() << "<br />"; html << "DqInputBuffer.IsPending: " << info.Buffer->IsPending() << "<br />"; const auto& popStats = info.Buffer->GetPopStats(); - html << "DqInputBuffer.PopStats.Bytes: " << popStats.Bytes << "<br />"; - html << "DqInputBuffer.PopStats.DecompressedBytes: " << popStats.DecompressedBytes << "<br />"; - html << "DqInputBuffer.PopStats.Rows: " << popStats.Rows << "<br />"; - html << "DqInputBuffer.PopStats.Chunks: " << popStats.Chunks << "<br />"; - html << "DqInputBuffer.PopStats.Splits: " << popStats.Splits << "<br />"; - html << "DqInputBuffer.PopStats.FirstMessageTs: " << popStats.FirstMessageTs.ToString() << "<br />"; - html << "DqInputBuffer.PopStats.PauseMessageTs: " << popStats.PauseMessageTs.ToString() << "<br />"; - html << "DqInputBuffer.PopStats.ResumeMessageTs: " << popStats.ResumeMessageTs.ToString() << "<br />"; - html << "DqInputBuffer.PopStats.LastMessageTs: " << popStats.LastMessageTs.ToString() << "<br />"; - html << "DqInputChannel.PopStats.WaitTime: " << popStats.WaitTime.ToString() << "<br />"; + dumpInputStats("DqInputBuffer."sv, popStats); + } + if (info.AsyncInput) { + const auto& input = *info.AsyncInput; + html << "AsyncInput.InputIndex: " << input.GetInputIndex() << "<br />"; + const auto& ingressStats = input.GetIngressStats(); + dumpAsyncStats("AsyncInput.IngressStats."sv, ingressStats); } } html << "<h3>OutputChannels</h3>"; for (const auto& [id, info]: OutputChannelsMap) { html << "<h4>Input Channel Id: " << id << "</h4>"; - html << "ChannelId: " << info.ChannelId << "<br />"; - html << "DstStageId: " << info.DstStageId << "<br />"; - html << "HasPeer: " << info.HasPeer << "<br />"; - html << "Finished: " << info.Finished << "<br />"; - html << "EarlyFinish: " << info.EarlyFinish << "<br />"; - html << "PopStarted: " << info.PopStarted << "<br />"; - html << "IsTransformOutput: " << info.IsTransformOutput << "<br />"; + DUMP(info, ChannelId); + DUMP(info, DstStageId); + DUMP(info, HasPeer); + DUMP(info, Finished); + DUMP(info, EarlyFinish); + DUMP(info, PopStarted); + DUMP(info, IsTransformOutput); html << "EWatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "<br />"; if (info.AsyncData) { @@ -324,21 +341,7 @@ private: html << "DqInputChannel.OutputType: " << (info.Channel->GetOutputType() ? info.Channel->GetOutputType()->GetKindAsStr() : TString{"unknown"}) << "<br />"; const auto& pushStats = info.Channel->GetPushStats(); - html << "DqOutputChannel.PushStats.MaxRowsInMemory: " << pushStats.MaxRowsInMemory << "<br />"; - html << "DqOutputChannel.PushStats.MaxMemoryUsage: " << pushStats.MaxMemoryUsage << "<br />"; - html << "DqOutputChannel.PushStats.Level: " << static_cast<int>(pushStats.Level) << "<br />"; - html << "DqOutputChannel.PushStats.MinWaitDuration: " << pushStats.MinWaitDuration.ToString() << "<br />"; - html << "DqOutputChannel.PushStats.CurrentPauseTs: " << (pushStats.CurrentPauseTs ? pushStats.CurrentPauseTs->ToString() : TString{}) << "<br />"; - html << "DqOutputChannel.PushStats.MergeWaitPeriod: " << pushStats.MergeWaitPeriod << "<br />"; - html << "DqOutputChannel.PushStats.Bytes: " << pushStats.Bytes << "<br />"; - html << "DqOutputChannel.PushStats.Rows: " << pushStats.Rows << "<br />"; - html << "DqOutputChannel.PushStats.Chunks: " << pushStats.Chunks << "<br />"; - html << "DqOutputChannel.PushStats.Splits: " << pushStats.Splits << "<br />"; - html << "DqOutputChannel.PushStats.FirstMessageTs: " << pushStats.FirstMessageTs.ToString() << "<br />"; - html << "DqOutputChannel.PushStats.PauseMessageTs: " << pushStats.PauseMessageTs.ToString() << "<br />"; - html << "DqOutputChannel.PushStats.ResumeMessageTs: " << pushStats.ResumeMessageTs.ToString() << "<br />"; - html << "DqOutputChannel.PushStats.LastMessageTs: " << pushStats.LastMessageTs.ToString() << "<br />"; - html << "DqOutputChannel.PushStats.WaitTime: " << pushStats.WaitTime.ToString() << "<br />"; + dumpOutputStats("DqOutputChannel.PushStats."sv, pushStats); const auto& popStats = info.Channel->GetPopStats(); html << "DqOutputChannel.PopStats.ChannelId: " << popStats.ChannelId << "<br />"; @@ -349,17 +352,58 @@ private: html << "DqOutputChannel.PopStats.SpilledBytes: " << popStats.SpilledBytes << "<br />"; html << "DqOutputChannel.PopStats.SpilledRows: " << popStats.SpilledRows << "<br />"; html << "DqOutputChannel.PopStats.SpilledBlobs: " << popStats.SpilledBlobs << "<br />"; - html << "DqOutputChannel.PopStats.Bytes: " << popStats.Bytes << "<br />"; - html << "DqOutputChannel.PopStats.Rows: " << popStats.Rows << "<br />"; - html << "DqOutputChannel.PopStats.Chunks: " << popStats.Chunks << "<br />"; - html << "DqOutputChannel.PopStats.Splits: " << popStats.Splits << "<br />"; - html << "DqOutputChannel.PopStats.FirstMessageTs: " << popStats.FirstMessageTs.ToString() << "<br />"; - html << "DqOutputChannel.PopStats.PauseMessageTs: " << popStats.PauseMessageTs.ToString() << "<br />"; - html << "DqOutputChannel.PopStats.ResumeMessageTs: " << popStats.ResumeMessageTs.ToString() << "<br />"; - html << "DqOutputChannel.PopStats.LastMessageTs: " << popStats.LastMessageTs.ToString() << "<br />"; - html << "DqOutputChannel.PopStats.WaitTime: " << popStats.WaitTime.ToString() << "<br />"; + dumpOutputStats("DqOutputChannel.PopStats."sv, popStats); + } + } + + html << "<h3>Sinks</h3>"; + for (const auto& [id, info]: SinksMap) { + html << "<h4>Sink Id: " << id << "</h4>"; + DUMP(info, Type); + DUMP(info, FreeSpaceBeforeSend); + DUMP(info, Finished); + DUMP(info, FinishIsAcknowledged); + DUMP(info, PopStarted); + if (info.Buffer) { + const auto& buffer = *info.Buffer; + html << "DqOutputBuffer.OutputIndex: " << buffer.GetOutputIndex() << "<br />"; + html << "DqOutputBuffer.IsFull: " << buffer.IsFull() << "<br />"; + html << "DqOutputBuffer.OutputType: " << (buffer.GetOutputType() ? buffer.GetOutputType()->GetKindAsStr() : TString{"unknown"}) << "<br />"; + html << "DqOutputBuffer.IsFinished: " << buffer.IsFinished() << "<br />"; + html << "DqOutputBuffer.HasData: " << buffer.HasData() << "<br />"; + + const auto& pushStats = buffer.GetPushStats(); + dumpOutputStats("DqOutputBuffer.PushStats."sv, pushStats); + + const auto& popStats = buffer.GetPopStats(); + dumpOutputStats("DqOutputBuffer.PopStats."sv, popStats); + } + if (info.AsyncOutput) { + const auto& output = *info.AsyncOutput; + html << "AsyncOutput.OutputIndex: " << output.GetOutputIndex() << "<br />"; + html << "AsyncOutput.FreeSpace: " << output.GetFreeSpace() << "<br />"; + const auto& egressStats = output.GetEgressStats(); + dumpAsyncStats("AsyncOutput.EgressStats."sv, egressStats); + } + } + + html << "<h3>Sources</h3>"; + for (const auto& [id, info]: SourcesMap) { + html << "<h4>Source Id: " << id << "</h4>"; + DUMP(info, Type); + DUMP(info, LogPrefix); + DUMP(info, Index); + DUMP(info, Finished); + html << "IsPausedByWatermark: " << info.IsPausedByWatermark() << "<br />"; + html << "FreeSpace: " << info.GetFreeSpace() << "<br />"; + if (info.AsyncInput) { + const auto& input = *info.AsyncInput; + html << "AsyncInput.InputIndex: " << input.GetInputIndex() << "<br />"; + const auto& ingressStats = input.GetIngressStats(); + dumpAsyncStats("AsyncInput.IngressStats."sv, ingressStats); } } +#undef DUMP Send(ev->Sender, new NActors::NMon::TEvHttpInfoRes(html.Str())); } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h index 0e7ad1bff7c..22aa7e6f586 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h @@ -35,7 +35,7 @@ public: , IssuesBuffer(IssuesBufferSize) , WatermarksMode(watermarksMode) {} - bool IsPausedByWatermark() { + bool IsPausedByWatermark() const { return PendingWatermark.Defined(); } diff --git a/ydb/library/yql/dq/runtime/dq_async_output.h b/ydb/library/yql/dq/runtime/dq_async_output.h index d399080dfa4..736b2d7b2c3 100644 --- a/ydb/library/yql/dq/runtime/dq_async_output.h +++ b/ydb/library/yql/dq/runtime/dq_async_output.h @@ -11,8 +11,6 @@ namespace NYql::NDq { struct TDqAsyncOutputBufferStats : TDqOutputStats { ui64 OutputIndex = 0; TString Type; - ui64 MaxMemoryUsage = 0; - ui64 MaxRowsInMemory = 0; }; class IDqAsyncOutputBuffer : public IDqOutput { |