diff options
author | hor911 <hor911@ydb.tech> | 2023-06-08 17:48:57 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-06-08 17:48:57 +0300 |
commit | 6a06e462fa9b0d494944d1b5ea5630160f6a7391 (patch) | |
tree | e87cc8219f3e0056c6fb41b8db53c134910f876d | |
parent | 2628f63cd297b95651c7f2e58237faea8ffd0c33 (diff) | |
download | ydb-6a06e462fa9b0d494944d1b5ea5630160f6a7391.tar.gz |
Detailed channel statistics
14 files changed, 205 insertions, 54 deletions
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp index 776cdcdcd73..00f2f8d5593 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp @@ -122,7 +122,7 @@ std::vector<TString> GetMeteringRecords(const TString& statistics, bool billable if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) { for (const auto& p : stat.GetMap()) { if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) { - if (auto* ingressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.count")) { + if (auto* ingressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.sum")) { ingress += ingressNode->GetInteger(); } } diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 8626a73d82f..558087acf71 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -221,10 +221,8 @@ private: outputChannel.PopStarted = true; ProcessOutputsState.Inflight++; + UpdateBlocked(outputChannel, toSend); if (toSend <= 0) { - if (Y_UNLIKELY(outputChannel.Stats)) { - outputChannel.Stats->BlockedByCapacity++; - } CA_LOG_T("Can not drain channel because it is blocked by capacity. ChannelId: " << channelId << ". To send: " << toSend << ". Free space: " << peerState.PeerFreeSpace diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp index 289f0fa0aaa..ab6fc86f91d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp @@ -105,6 +105,14 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelData::TPtr& ev) TInputChannelState& inputChannel = InCh(channelId); + if (Y_UNLIKELY(inputChannel.Stats)) { + auto now = Now(); + if (inputChannel.Stats->FirstMessageTs == TInstant::Zero()) { + inputChannel.Stats->FirstMessageTs = now; + } + inputChannel.Stats->LastMessageTs = now; + } + LOG_T("Received input for channelId: " << channelId << ", seqNo: " << record.GetSeqNo() << ", size: " << channelData.GetData().GetRaw().size() @@ -148,7 +156,12 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelData::TPtr& ev) if (inputChannel.PollRequest && inputChannel.PollRequest->SeqNo <= record.GetSeqNo()) { if (Y_UNLIKELY(inputChannel.Stats)) { - inputChannel.Stats->WaitTime += TInstant::Now() - *inputChannel.StartPollTime; + auto waitTime = TInstant::Now() - *inputChannel.StartPollTime; + if (inputChannel.Stats->FirstMessageTs == TInstant::Zero()) { + inputChannel.Stats->IdleTime += waitTime; + } else { + inputChannel.Stats->WaitTime += waitTime; + } inputChannel.StartPollTime.reset(); } inputChannel.PollRequest.reset(); @@ -250,6 +263,11 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvRetryChannelData::TPtr if (Y_UNLIKELY(outputChannel.Stats)) { outputChannel.Stats->ResentMessages++; + auto now = Now(); + if (outputChannel.Stats->FirstMessageTs == TInstant::Zero()) { + outputChannel.Stats->FirstMessageTs = now; + } + outputChannel.Stats->LastMessageTs = now; } auto retryEv = MakeHolder<TEvDqCompute::TEvChannelData>(); @@ -562,6 +580,14 @@ void TDqComputeActorChannels::SendChannelData(NDqProto::TChannelData&& channelDa << ", seqNo: " << seqNo << ", finished: " << finished); + if (Y_UNLIKELY(outputChannel.Stats)) { + auto now = Now(); + if (outputChannel.Stats->FirstMessageTs == TInstant::Zero()) { + outputChannel.Stats->FirstMessageTs = now; + } + outputChannel.Stats->LastMessageTs = now; + } + auto dataEv = MakeHolder<TEvDqCompute::TEvChannelData>(); dataEv->Record.SetSeqNo(seqNo); dataEv->Record.SetSendTime(Now().MilliSeconds()); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h index 6a2330f21ec..2f30111320e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h @@ -31,11 +31,16 @@ public: struct TInputChannelStats { ui64 PollRequests = 0; ui64 ResentMessages = 0; - TDuration WaitTime; + TDuration IdleTime; // wait time until 1st message received + TDuration WaitTime; // wait time after 1st message received + TInstant FirstMessageTs; + TInstant LastMessageTs; }; struct TOutputChannelStats { ui64 ResentMessages = 0; + TInstant FirstMessageTs; + TInstant LastMessageTs; }; public: diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 19623a925e2..ac05aaf117e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -905,6 +905,7 @@ protected: struct TInputChannelInfo { const TString LogPrefix; ui64 ChannelId; + ui32 SrcStageId; IDqInputChannel::TPtr Channel; bool HasPeer = false; std::queue<TInstant> PendingWatermarks; @@ -916,10 +917,12 @@ protected: explicit TInputChannelInfo( const TString& logPrefix, ui64 channelId, + ui32 srcStageId, NDqProto::EWatermarksMode watermarksMode, NDqProto::ECheckpointingMode checkpointingMode) : LogPrefix(logPrefix) , ChannelId(channelId) + , SrcStageId(srcStageId) , WatermarksMode(watermarksMode) , CheckpointingMode(checkpointingMode) { @@ -1009,6 +1012,7 @@ protected: struct TOutputChannelInfo { ui64 ChannelId; + ui32 DstStageId; IDqOutputChannel::TPtr Channel; bool HasPeer = false; bool Finished = false; // != Channel->IsFinished() // If channel is in finished state, it sends only checkpoints. @@ -1016,13 +1020,16 @@ protected: bool IsTransformOutput = false; // Is this channel output of a transform. NDqProto::EWatermarksMode WatermarksMode = NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED; - explicit TOutputChannelInfo(ui64 channelId) - : ChannelId(channelId) + TOutputChannelInfo(ui64 channelId, ui32 dstStageId) + : ChannelId(channelId), DstStageId(dstStageId) { } struct TStats { ui64 BlockedByCapacity = 0; ui64 NoDstActorId = 0; + TDuration BlockedTime; + std::optional<TInstant> StartBlockedTime; + }; THolder<TStats> Stats; @@ -1280,6 +1287,24 @@ protected: return allowedOvercommit; } +protected: + + void UpdateBlocked(TOutputChannelInfo& outputChannel, i64 toSend) { + if (Y_UNLIKELY(outputChannel.Stats)) { + if (toSend <= 0) { + outputChannel.Stats->BlockedByCapacity++; + if (!outputChannel.Stats->StartBlockedTime) { + outputChannel.Stats->StartBlockedTime = TInstant::Now(); + } + } else { + if (outputChannel.Stats->StartBlockedTime) { + outputChannel.Stats->BlockedTime += TInstant::Now() - *outputChannel.Stats->StartBlockedTime; + outputChannel.Stats->StartBlockedTime.reset(); + } + } + } + } + private: virtual const TDqMemoryQuota::TProfileStats* GetProfileStats() const { Y_VERIFY(MemoryQuota); @@ -1309,11 +1334,7 @@ private: ProcessOutputsState.HasDataToSend |= !outputChannel.Finished; ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished; - if (toSend <= 0) { - if (Y_UNLIKELY(outputChannel.Stats)) { - outputChannel.Stats->BlockedByCapacity++; - } - } + UpdateBlocked(outputChannel, toSend); i64 remains = toSend; while (remains > 0 && (!outputChannel.Finished || Checkpoints)) { @@ -1811,6 +1832,7 @@ private: TInputChannelInfo( LogPrefix, channel.GetId(), + channel.GetSrcStageId(), channel.GetWatermarksMode(), channel.GetCheckpointingMode()) ); @@ -1833,7 +1855,7 @@ private: YQL_ENSURE(result.second); } else { for (auto& channel : outputDesc.GetChannels()) { - TOutputChannelInfo outputChannel(channel.GetId()); + TOutputChannelInfo outputChannel(channel.GetId(), channel.GetDstStageId()); outputChannel.HasPeer = channel.GetDstEndpoint().HasActorId(); outputChannel.IsTransformOutput = outputDesc.HasTransform(); outputChannel.WatermarksMode = channel.GetWatermarksMode(); @@ -2034,18 +2056,28 @@ public: protoInputChannelStats.SetPollRequests(caChannelStats->PollRequests); protoInputChannelStats.SetWaitTimeUs(caChannelStats->WaitTime.MicroSeconds()); protoInputChannelStats.SetResentMessages(caChannelStats->ResentMessages); + protoInputChannelStats.SetFirstMessageMs(caChannelStats->FirstMessageTs.MilliSeconds()); + protoInputChannelStats.SetLastMessageMs(caChannelStats->LastMessageTs.MilliSeconds()); + } + + if (auto* inputInfo = InputChannelsMap.FindPtr(protoInputChannelStats.GetChannelId())) { + protoInputChannelStats.SetSrcStageId(inputInfo->SrcStageId); } } for (auto& protoOutputChannelStats : *protoTask->MutableOutputChannels()) { - if (auto* x = Channels->GetOutputChannelStats(protoOutputChannelStats.GetChannelId())) { - protoOutputChannelStats.SetResentMessages(x->ResentMessages); + if (auto* caChannelStats = Channels->GetOutputChannelStats(protoOutputChannelStats.GetChannelId())) { + protoOutputChannelStats.SetResentMessages(caChannelStats->ResentMessages); + protoOutputChannelStats.SetFirstMessageMs(caChannelStats->FirstMessageTs.MilliSeconds()); + protoOutputChannelStats.SetLastMessageMs(caChannelStats->LastMessageTs.MilliSeconds()); } if (auto* outputInfo = OutputChannelsMap.FindPtr(protoOutputChannelStats.GetChannelId())) { - if (auto *x = outputInfo->Stats.Get()) { - protoOutputChannelStats.SetBlockedByCapacity(x->BlockedByCapacity); - protoOutputChannelStats.SetNoDstActorId(x->NoDstActorId); + protoOutputChannelStats.SetDstStageId(outputInfo->DstStageId); + if (auto *outputChannelStats = outputInfo->Stats.Get()) { + protoOutputChannelStats.SetBlockedTimeUs(outputChannelStats->BlockedTime.MicroSeconds()); + protoOutputChannelStats.SetBlockedByCapacity(outputChannelStats->BlockedByCapacity); + protoOutputChannelStats.SetNoDstActorId(outputChannelStats->NoDstActorId); } } } @@ -2060,6 +2092,8 @@ public: if (last && MemoryQuota) { MemoryQuota->ResetProfileStats(); } + + // Cerr << "STAAT\n" << dst->DebugString() << Endl; } protected: diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index d47176b2a49..a8ac09a2106 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -33,6 +33,7 @@ message TDqAsyncInputBufferStats { message TDqInputChannelStats { // basic stats uint64 ChannelId = 1; + uint32 SrcStageId = 20; uint64 Chunks = 2; uint64 Bytes = 3; uint64 RowsIn = 4; @@ -44,7 +45,11 @@ message TDqInputChannelStats { uint32 PollRequests = 8; uint32 ResentMessages = 9; - uint64 WaitTimeUs = 10; + uint64 IdleTimeUs = 11; // wait time until 1st message received + uint64 WaitTimeUs = 10; // wait time after 1st message received + + uint64 FirstMessageMs = 30; + uint64 LastMessageMs = 31; google.protobuf.Any Extra = 100; } @@ -69,6 +74,7 @@ message TDqAsyncOutputBufferStats { message TDqOutputChannelStats { // basic stats uint64 ChannelId = 1; + uint32 DstStageId = 20; uint64 Chunks = 2; uint64 Bytes = 3; uint64 RowsIn = 4; @@ -86,6 +92,11 @@ message TDqOutputChannelStats { uint64 SpilledRows = 13; uint64 SpilledBlobs = 14; + uint64 BlockedTimeUs = 15; + + uint64 FirstMessageMs = 30; + uint64 LastMessageMs = 31; + google.protobuf.Any Extra = 100; } diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index da6b4e5df06..bad0b318533 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -72,6 +72,8 @@ message TChannel { uint64 Id = 1; EDataTransportVersion TransportVersion = 2; + uint32 SrcStageId = 11; + uint32 DstStageId = 12; uint64 SrcTaskId = 3; uint64 DstTaskId = 4; TEndpoint SrcEndpoint = 5; diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h index 14c4f871f31..7401576c613 100644 --- a/ydb/library/yql/dq/tasks/dq_connection_builder.h +++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h @@ -59,8 +59,10 @@ void BuildUnionAllChannels(TGraph& graph, const typename TGraph::TStageInfoType& auto& originTask = graph.GetTask(originTaskId); auto& channel = graph.AddChannel(); + channel.SrcStageId = inputStageInfo.Id; channel.SrcTask = originTaskId; channel.SrcOutputIndex = outputIndex; + channel.DstStageId = stageInfo.Id; channel.DstTask = targetTask.Id; channel.DstInputIndex = inputIndex; channel.InMemory = !enableSpilling || inputStageInfo.OutputsCount == 1; @@ -108,8 +110,10 @@ void BuildHashShuffleChannels(TGraph& graph, const typename TGraph::TStageInfoTy auto& targetTask = graph.GetTask(targetTaskId); auto& channel = graph.AddChannel(); + channel.SrcStageId = inputStageInfo.Id; channel.SrcTask = originTask.Id; channel.SrcOutputIndex = outputIndex; + channel.DstStageId = stageInfo.Id; channel.DstTask = targetTask.Id; channel.DstInputIndex = inputIndex; channel.InMemory = !enableSpilling || inputStageInfo.OutputsCount == 1; @@ -156,8 +160,10 @@ void BuildMapChannels(TGraph& graph, const typename TGraph::TStageInfoType& stag auto targetTaskId = targetTasks[i]; auto& channel = graph.AddChannel(); + channel.SrcStageId = inputStageInfo.Id; channel.SrcTask = originTaskId; channel.SrcOutputIndex = outputIndex; + channel.DstStageId = stageInfo.Id; channel.DstTask = targetTaskId; channel.DstInputIndex = inputIndex; channel.InMemory = !enableSpilling || inputStageInfo.OutputsCount == 1; @@ -207,8 +213,10 @@ void BuildBroadcastChannels(TGraph& graph, const typename TGraph::TStageInfoType auto targetTaskId = targetTasks[i]; auto& channel = graph.AddChannel(); + channel.SrcStageId = inputStageInfo.Id; channel.SrcTask = originTaskId; channel.SrcOutputIndex = outputIndex; + channel.DstStageId = stageInfo.Id; channel.DstTask = targetTaskId; channel.DstInputIndex = inputIndex; channel.InMemory = !enableSpilling || inputStageInfo.OutputsCount == 1; @@ -248,8 +256,10 @@ void BuildMergeChannels(TGraph& graph, const typename TGraph::TStageInfoType& st auto& originTask = graph.GetTask(originTaskId); auto& channel = graph.AddChannel(); + channel.SrcStageId = inputStageInfo.Id; channel.SrcTask = originTaskId; channel.SrcOutputIndex = outputIndex; + channel.DstStageId = stageInfo.Id; channel.DstTask = targetTask.Id; channel.DstInputIndex = inputIndex; channel.InMemory = true; diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h index ca5ddbe86b7..4d6aeb1d94c 100644 --- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h +++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h @@ -15,6 +15,7 @@ struct TStageId { ui64 TxId = 0; ui64 StageId = 0; + TStageId() = default; TStageId(ui64 txId, ui64 stageId) : TxId(txId) , StageId(stageId) {} @@ -79,13 +80,17 @@ struct TStageInfo : private TMoveOnly { struct TChannel { ui64 Id = 0; + TStageId SrcStageId; ui64 SrcTask = 0; ui32 SrcOutputIndex = 0; + TStageId DstStageId; ui64 DstTask = 0; ui32 DstInputIndex = 0; bool InMemory = true; NDqProto::ECheckpointingMode CheckpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT; NDqProto::EWatermarksMode WatermarksMode = NDqProto::WATERMARKS_MODE_DISABLED; + + TChannel() = default; }; using TChannelList = TVector<ui64>; @@ -280,7 +285,8 @@ public: } TChannel& AddChannel() { - TChannel channel{Channels.size() + 1}; + TChannel channel; + channel.Id = Channels.size() + 1; return Channels.emplace_back(channel); } diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h index 667f4197d5f..d032d2fd7d8 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h +++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h @@ -295,9 +295,9 @@ private: auto& counter = *ServiceCounters.PublicCounters->GetNamedCounter("name", publicCounterName, isDeriv); if (name == "MultiHop_LateThrownEventsCount") { // the only incremental sensor from TaskRunner - counter += v.Count; + counter += v.Sum; } else { - counter = v.Count; + counter = v.Sum; } } } @@ -409,6 +409,7 @@ private: for (const auto& stats : s.GetInputChannels()) { auto labels = commonLabels; labels["InputChannel"] = ToString(stats.GetChannelId()); + labels["SrcStageId"] = ToString(stats.GetSrcStageId()); ADD_COUNTER(Chunks); ADD_COUNTER(Bytes); @@ -417,6 +418,18 @@ private: ADD_COUNTER(MaxMemoryUsage); ADD_COUNTER(DeserializationTimeUs); + ADD_COUNTER(IdleTimeUs); + ADD_COUNTER(WaitTimeUs); + ADD_COUNTER(FirstMessageMs); + ADD_COUNTER(LastMessageMs); + + if (stats.GetFirstMessageMs() && stats.GetLastMessageMs()) { + TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "ActiveTimeUs"), + ( TInstant::MilliSeconds(stats.GetLastMessageMs()) - + TInstant::MilliSeconds(stats.GetFirstMessageMs()) ).MicroSeconds() + ); + } + // if (stats.GetFinishTs() >= stats.GetStartTs()) { // TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); // } @@ -425,6 +438,7 @@ private: for (const auto& stats : s.GetOutputChannels()) { auto labels = commonLabels; labels["OutputChannel"] = ToString(stats.GetChannelId()); + labels["DstStageId"] = ToString(stats.GetDstStageId()); ADD_COUNTER(Chunks) ADD_COUNTER(Bytes); @@ -439,6 +453,17 @@ private: ADD_COUNTER(SpilledRows); ADD_COUNTER(SpilledBlobs); + ADD_COUNTER(BlockedTimeUs); + ADD_COUNTER(FirstMessageMs); + ADD_COUNTER(LastMessageMs); + + if (stats.GetFirstMessageMs() && stats.GetLastMessageMs()) { + TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "ActiveTimeUs"), + ( TInstant::MilliSeconds(stats.GetLastMessageMs()) - + TInstant::MilliSeconds(stats.GetFirstMessageMs()) ).MicroSeconds() + ); + } + // if (stats.GetFinishTs() >= stats.GetStartTs()) { // TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); // } diff --git a/ydb/library/yql/providers/dq/counters/counters.cpp b/ydb/library/yql/providers/dq/counters/counters.cpp index 87b39735d0f..df3171b0043 100644 --- a/ydb/library/yql/providers/dq/counters/counters.cpp +++ b/ydb/library/yql/providers/dq/counters/counters.cpp @@ -49,8 +49,6 @@ TCounters AggregateQueryStatsByStage(TCounters& queryStat, const THashMap<ui64, std::map<TString, TString> labels; TString prefix, name; if (k.StartsWith("TaskRunner") && NCommon::ParseCounterName(&prefix, &labels, &name, k)) { - auto maybeInputChannel = labels.find("InputChannel"); - auto maybeOutputChannel = labels.find("OutputChannel"); auto maybeTask = labels.find("Task"); if (maybeTask == labels.end()) { aggregatedQueryStat.AddCounter(k, v); @@ -65,29 +63,56 @@ TCounters AggregateQueryStatsByStage(TCounters& queryStat, const THashMap<ui64, ? "0" : ToString(maybeStage->second); ui64 channelId; + bool input = false; + bool output = false; + auto maybeInputChannel = labels.find("InputChannel"); if (maybeInputChannel != labels.end()) { if (!TryFromString(maybeInputChannel->second, channelId)) { continue; } + ui32 stage = 0; + auto maybeSrcStageId = labels.find("SrcStageId"); + if (maybeSrcStageId != labels.end()) { + TryFromString(maybeSrcStageId->second, stage); + labels.erase(maybeSrcStageId); + } stage2Input[stageId].insert(channelId); stage2Input["Total"].insert(channelId); labels.erase(maybeInputChannel); - labels["Input"] = "1"; + labels["Input"] = ToString(stage); + input = true; } + auto maybeOutputChannel = labels.find("OutputChannel"); if (maybeOutputChannel != labels.end()) { if (!TryFromString(maybeOutputChannel->second, channelId)) { continue; } + ui32 stage = 0; + auto maybeDstStageId = labels.find("DstStageId"); + if (maybeDstStageId != labels.end()) { + TryFromString(maybeDstStageId->second, stage); + labels.erase(maybeDstStageId); + } stage2Output[stageId].insert(channelId); stage2Output["Total"].insert(channelId); labels.erase(maybeOutputChannel); - labels["Output"] = "1"; + labels["Output"] = ToString(stage); + output = true; } labels.erase(maybeTask); labels["Stage"] = ToString(stageId); stage2Tasks[stageId].insert(taskId); stage2Tasks["Total"].insert(taskId); aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", labels, name), v); + if (input || output) { + if (input) { + labels["Input"] = "Total"; + } + if (output) { + labels["Output"] = "Total"; + } + aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", labels, name), v); + } labels["Stage"] = "Total"; aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", labels, name), v); } else { @@ -124,11 +149,11 @@ TCounters AggregateQueryStatsByStage(TCounters& queryStat, const THashMap<ui64, } for (const auto& [stageId, v] : stage2Input) { aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", - {{"Stage", stageId},{"Input", "1"}}, "ChannelsCount"), static_cast<ui64>(v.size())); + {{"Stage", stageId},{"Input", "Total"}}, "ChannelsCount"), static_cast<ui64>(v.size())); } - for (const auto& [stageId, v] : stage2Input) { + for (const auto& [stageId, v] : stage2Output) { aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", - {{"Stage", stageId},{"Output", "1"}}, "ChannelsCount"), static_cast<ui64>(v.size())); + {{"Stage", stageId},{"Output", "Total"}}, "ChannelsCount"), static_cast<ui64>(v.size())); } aggregatedQueryStat.AddCounter("StagesCount", static_cast<ui64>(stage2Tasks.size())); diff --git a/ydb/library/yql/providers/dq/counters/counters.h b/ydb/library/yql/providers/dq/counters/counters.h index ec05baa82da..de37fc7a317 100644 --- a/ydb/library/yql/providers/dq/counters/counters.h +++ b/ydb/library/yql/providers/dq/counters/counters.h @@ -56,14 +56,12 @@ struct TCounters { template<typename T> void AddCounter(const TString& name, T value) const { - auto& counter = Counters[name]; - counter.Count += value; + Counters[name].Add(TEntry(value)); } template<typename T> void SetCounter(const TString& name, T value) const { - auto& counter = Counters[name]; - counter.Count = value; + Counters[name] = TEntry(value); } THashMap<i64, ui64>& GetHistogram(const TString& name) { @@ -142,6 +140,25 @@ struct TCounters { i64 Min = 0; i64 Avg = 0; i64 Count = 0; + + TEntry() = default; + explicit TEntry(i64 value) { + Sum = value; + Max = value; + Min = value; + Avg = value; + Count = 1; + } + + void Add(const TEntry& entry) { + if (entry.Count) { + Sum += entry.Sum; + Min = (Count == 0) ? entry.Min : ::Min(Min, entry.Min); + Max = (Count == 0) ? entry.Max : ::Max(Max, entry.Max); + Count += entry.Count; + Avg = Sum / Count; + } + } }; struct TCounterBlock { @@ -171,18 +188,7 @@ struct TCounters { } void AddCounter(const TString& name, const TEntry& value) const { - auto& counter = Counters[name]; - if (value.Count) { - counter.Sum += value.Sum; - counter.Min = counter.Count == 0 - ? value.Min - : Min(counter.Min, value.Min); - counter.Max = counter.Count == 0 - ? value.Max - : Max(counter.Max, value.Max); - counter.Count += value.Count; - counter.Avg = counter.Sum / counter.Count; - } + Counters[name].Add(value); } void Clear() const { diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index dbec17dec57..89d7cc995bc 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -407,7 +407,7 @@ namespace NYql::NDqs { tasks[i].ComputeActorId = workers[i]; } - THashMap<TStageId, std::tuple<TString, ui64, ui64>> stagePrograms = BuildAllPrograms(); + BuildAllPrograms(); TVector<TDqTask> plan; THashSet<TString> clusterNameHints; for (const auto& task : tasks) { @@ -450,7 +450,7 @@ namespace NYql::NDqs { program.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0); TString programStr; ui64 stageId, publicId; - std::tie(programStr, stageId, publicId) = stagePrograms[task.StageId]; + std::tie(programStr, stageId, publicId) = StagePrograms[task.StageId]; program.SetRaw(programStr); taskMeta.SetStageId(publicId); taskDesc.MutableMeta()->PackFrom(taskMeta); @@ -589,10 +589,11 @@ namespace NYql::NDqs { #undef BUILD_CONNECTION -THashMap<TStageId, std::tuple<TString,ui64,ui64>> TDqsExecutionPlanner::BuildAllPrograms() { +void TDqsExecutionPlanner::BuildAllPrograms() { using namespace NKikimr::NMiniKQL; - THashMap<TStageId, std::tuple<TString,ui64,ui64>> result; + StagePrograms.clear(); + TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), FunctionRegistry->SupportsSizedAllocators()); TTypeEnvironment typeEnv(alloc); TVector<NNodes::TExprBase> fakeReads; @@ -623,18 +624,18 @@ THashMap<TStageId, std::tuple<TString,ui64,ui64>> TDqsExecutionPlanner::BuildAll Y_VERIFY(false); } */ - result[stageInfo.first] = std::make_tuple( + StagePrograms[stageInfo.first] = std::make_tuple( NDq::BuildProgram( stage.Program(), *paramsType, compiler, typeEnv, *FunctionRegistry, ExprContext, fakeReads), stageId, publicId); } - - return result; } void TDqsExecutionPlanner::FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel) { channelDesc.SetId(channel.Id); + channelDesc.SetSrcStageId(std::get<2>(StagePrograms[channel.SrcStageId])); + channelDesc.SetDstStageId(std::get<2>(StagePrograms[channel.DstStageId])); channelDesc.SetSrcTaskId(channel.SrcTask); channelDesc.SetDstTaskId(channel.DstTask); channelDesc.SetCheckpointingMode(channel.CheckpointingMode); @@ -785,6 +786,7 @@ THashMap<TStageId, std::tuple<TString,ui64,ui64>> TDqsExecutionPlanner::BuildAll auto channelDesc = outputDesc->AddChannels(); channelDesc->SetId(1); + channelDesc->SetSrcStageId(1); channelDesc->SetSrcTaskId(2); channelDesc->SetDstTaskId(1); diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.h b/ydb/library/yql/providers/dq/planner/execution_planner.h index cacb6ea2c71..4c3abee30fd 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.h +++ b/ydb/library/yql/providers/dq/planner/execution_planner.h @@ -67,7 +67,7 @@ namespace NYql::NDqs { private: bool BuildReadStage(const NNodes::TDqPhyStage& stage, bool dqSource, bool canFallback); void BuildConnections(const NNodes::TDqPhyStage& stage); - THashMap<NDq::TStageId, std::tuple<TString,ui64,ui64>> BuildAllPrograms(); + void BuildAllPrograms(); void FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel); void FillInputDesc(NDqProto::TTaskInput& inputDesc, const TTaskInput& input); void FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output); @@ -93,6 +93,7 @@ namespace NYql::NDqs { TVector<NDqProto::TDqTask> Tasks; THashMap<ui64, ui32> PublicIds; + THashMap<NDq::TStageId, std::tuple<TString,ui64,ui64>> StagePrograms; }; // Execution planner for TRuntimeNode |