aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-06-08 17:48:57 +0300
committerhor911 <hor911@ydb.tech>2023-06-08 17:48:57 +0300
commit6a06e462fa9b0d494944d1b5ea5630160f6a7391 (patch)
treee87cc8219f3e0056c6fb41b8db53c134910f876d
parent2628f63cd297b95651c7f2e58237faea8ffd0c33 (diff)
downloadydb-6a06e462fa9b0d494944d1b5ea5630160f6a7391.tar.gz
Detailed channel statistics
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/utils.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp4
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp28
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h7
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h60
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_stats.proto13
-rw-r--r--ydb/library/yql/dq/proto/dq_tasks.proto2
-rw-r--r--ydb/library/yql/dq/tasks/dq_connection_builder.h10
-rw-r--r--ydb/library/yql/dq/tasks/dq_tasks_graph.h8
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller_impl.h29
-rw-r--r--ydb/library/yql/providers/dq/counters/counters.cpp39
-rw-r--r--ydb/library/yql/providers/dq/counters/counters.h38
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp16
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.h3
14 files changed, 205 insertions, 54 deletions
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
index 776cdcdcd73..00f2f8d5593 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
@@ -122,7 +122,7 @@ std::vector<TString> GetMeteringRecords(const TString& statistics, bool billable
if (NJson::ReadJsonTree(statistics, &jsonConfig, &stat)) {
for (const auto& p : stat.GetMap()) {
if (p.first.StartsWith("Graph=") || p.first.StartsWith("Precompute=")) {
- if (auto* ingressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.count")) {
+ if (auto* ingressNode = p.second.GetValueByPath("TaskRunner.Stage=Total.IngressS3SourceBytes.sum")) {
ingress += ingressNode->GetInteger();
}
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 8626a73d82f..558087acf71 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -221,10 +221,8 @@ private:
outputChannel.PopStarted = true;
ProcessOutputsState.Inflight++;
+ UpdateBlocked(outputChannel, toSend);
if (toSend <= 0) {
- if (Y_UNLIKELY(outputChannel.Stats)) {
- outputChannel.Stats->BlockedByCapacity++;
- }
CA_LOG_T("Can not drain channel because it is blocked by capacity. ChannelId: " << channelId
<< ". To send: " << toSend
<< ". Free space: " << peerState.PeerFreeSpace
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
index 289f0fa0aaa..ab6fc86f91d 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
@@ -105,6 +105,14 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelData::TPtr& ev)
TInputChannelState& inputChannel = InCh(channelId);
+ if (Y_UNLIKELY(inputChannel.Stats)) {
+ auto now = Now();
+ if (inputChannel.Stats->FirstMessageTs == TInstant::Zero()) {
+ inputChannel.Stats->FirstMessageTs = now;
+ }
+ inputChannel.Stats->LastMessageTs = now;
+ }
+
LOG_T("Received input for channelId: " << channelId
<< ", seqNo: " << record.GetSeqNo()
<< ", size: " << channelData.GetData().GetRaw().size()
@@ -148,7 +156,12 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelData::TPtr& ev)
if (inputChannel.PollRequest && inputChannel.PollRequest->SeqNo <= record.GetSeqNo()) {
if (Y_UNLIKELY(inputChannel.Stats)) {
- inputChannel.Stats->WaitTime += TInstant::Now() - *inputChannel.StartPollTime;
+ auto waitTime = TInstant::Now() - *inputChannel.StartPollTime;
+ if (inputChannel.Stats->FirstMessageTs == TInstant::Zero()) {
+ inputChannel.Stats->IdleTime += waitTime;
+ } else {
+ inputChannel.Stats->WaitTime += waitTime;
+ }
inputChannel.StartPollTime.reset();
}
inputChannel.PollRequest.reset();
@@ -250,6 +263,11 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvRetryChannelData::TPtr
if (Y_UNLIKELY(outputChannel.Stats)) {
outputChannel.Stats->ResentMessages++;
+ auto now = Now();
+ if (outputChannel.Stats->FirstMessageTs == TInstant::Zero()) {
+ outputChannel.Stats->FirstMessageTs = now;
+ }
+ outputChannel.Stats->LastMessageTs = now;
}
auto retryEv = MakeHolder<TEvDqCompute::TEvChannelData>();
@@ -562,6 +580,14 @@ void TDqComputeActorChannels::SendChannelData(NDqProto::TChannelData&& channelDa
<< ", seqNo: " << seqNo
<< ", finished: " << finished);
+ if (Y_UNLIKELY(outputChannel.Stats)) {
+ auto now = Now();
+ if (outputChannel.Stats->FirstMessageTs == TInstant::Zero()) {
+ outputChannel.Stats->FirstMessageTs = now;
+ }
+ outputChannel.Stats->LastMessageTs = now;
+ }
+
auto dataEv = MakeHolder<TEvDqCompute::TEvChannelData>();
dataEv->Record.SetSeqNo(seqNo);
dataEv->Record.SetSendTime(Now().MilliSeconds());
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h
index 6a2330f21ec..2f30111320e 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.h
@@ -31,11 +31,16 @@ public:
struct TInputChannelStats {
ui64 PollRequests = 0;
ui64 ResentMessages = 0;
- TDuration WaitTime;
+ TDuration IdleTime; // wait time until 1st message received
+ TDuration WaitTime; // wait time after 1st message received
+ TInstant FirstMessageTs;
+ TInstant LastMessageTs;
};
struct TOutputChannelStats {
ui64 ResentMessages = 0;
+ TInstant FirstMessageTs;
+ TInstant LastMessageTs;
};
public:
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 19623a925e2..ac05aaf117e 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -905,6 +905,7 @@ protected:
struct TInputChannelInfo {
const TString LogPrefix;
ui64 ChannelId;
+ ui32 SrcStageId;
IDqInputChannel::TPtr Channel;
bool HasPeer = false;
std::queue<TInstant> PendingWatermarks;
@@ -916,10 +917,12 @@ protected:
explicit TInputChannelInfo(
const TString& logPrefix,
ui64 channelId,
+ ui32 srcStageId,
NDqProto::EWatermarksMode watermarksMode,
NDqProto::ECheckpointingMode checkpointingMode)
: LogPrefix(logPrefix)
, ChannelId(channelId)
+ , SrcStageId(srcStageId)
, WatermarksMode(watermarksMode)
, CheckpointingMode(checkpointingMode)
{
@@ -1009,6 +1012,7 @@ protected:
struct TOutputChannelInfo {
ui64 ChannelId;
+ ui32 DstStageId;
IDqOutputChannel::TPtr Channel;
bool HasPeer = false;
bool Finished = false; // != Channel->IsFinished() // If channel is in finished state, it sends only checkpoints.
@@ -1016,13 +1020,16 @@ protected:
bool IsTransformOutput = false; // Is this channel output of a transform.
NDqProto::EWatermarksMode WatermarksMode = NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED;
- explicit TOutputChannelInfo(ui64 channelId)
- : ChannelId(channelId)
+ TOutputChannelInfo(ui64 channelId, ui32 dstStageId)
+ : ChannelId(channelId), DstStageId(dstStageId)
{ }
struct TStats {
ui64 BlockedByCapacity = 0;
ui64 NoDstActorId = 0;
+ TDuration BlockedTime;
+ std::optional<TInstant> StartBlockedTime;
+
};
THolder<TStats> Stats;
@@ -1280,6 +1287,24 @@ protected:
return allowedOvercommit;
}
+protected:
+
+ void UpdateBlocked(TOutputChannelInfo& outputChannel, i64 toSend) {
+ if (Y_UNLIKELY(outputChannel.Stats)) {
+ if (toSend <= 0) {
+ outputChannel.Stats->BlockedByCapacity++;
+ if (!outputChannel.Stats->StartBlockedTime) {
+ outputChannel.Stats->StartBlockedTime = TInstant::Now();
+ }
+ } else {
+ if (outputChannel.Stats->StartBlockedTime) {
+ outputChannel.Stats->BlockedTime += TInstant::Now() - *outputChannel.Stats->StartBlockedTime;
+ outputChannel.Stats->StartBlockedTime.reset();
+ }
+ }
+ }
+ }
+
private:
virtual const TDqMemoryQuota::TProfileStats* GetProfileStats() const {
Y_VERIFY(MemoryQuota);
@@ -1309,11 +1334,7 @@ private:
ProcessOutputsState.HasDataToSend |= !outputChannel.Finished;
ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished;
- if (toSend <= 0) {
- if (Y_UNLIKELY(outputChannel.Stats)) {
- outputChannel.Stats->BlockedByCapacity++;
- }
- }
+ UpdateBlocked(outputChannel, toSend);
i64 remains = toSend;
while (remains > 0 && (!outputChannel.Finished || Checkpoints)) {
@@ -1811,6 +1832,7 @@ private:
TInputChannelInfo(
LogPrefix,
channel.GetId(),
+ channel.GetSrcStageId(),
channel.GetWatermarksMode(),
channel.GetCheckpointingMode())
);
@@ -1833,7 +1855,7 @@ private:
YQL_ENSURE(result.second);
} else {
for (auto& channel : outputDesc.GetChannels()) {
- TOutputChannelInfo outputChannel(channel.GetId());
+ TOutputChannelInfo outputChannel(channel.GetId(), channel.GetDstStageId());
outputChannel.HasPeer = channel.GetDstEndpoint().HasActorId();
outputChannel.IsTransformOutput = outputDesc.HasTransform();
outputChannel.WatermarksMode = channel.GetWatermarksMode();
@@ -2034,18 +2056,28 @@ public:
protoInputChannelStats.SetPollRequests(caChannelStats->PollRequests);
protoInputChannelStats.SetWaitTimeUs(caChannelStats->WaitTime.MicroSeconds());
protoInputChannelStats.SetResentMessages(caChannelStats->ResentMessages);
+ protoInputChannelStats.SetFirstMessageMs(caChannelStats->FirstMessageTs.MilliSeconds());
+ protoInputChannelStats.SetLastMessageMs(caChannelStats->LastMessageTs.MilliSeconds());
+ }
+
+ if (auto* inputInfo = InputChannelsMap.FindPtr(protoInputChannelStats.GetChannelId())) {
+ protoInputChannelStats.SetSrcStageId(inputInfo->SrcStageId);
}
}
for (auto& protoOutputChannelStats : *protoTask->MutableOutputChannels()) {
- if (auto* x = Channels->GetOutputChannelStats(protoOutputChannelStats.GetChannelId())) {
- protoOutputChannelStats.SetResentMessages(x->ResentMessages);
+ if (auto* caChannelStats = Channels->GetOutputChannelStats(protoOutputChannelStats.GetChannelId())) {
+ protoOutputChannelStats.SetResentMessages(caChannelStats->ResentMessages);
+ protoOutputChannelStats.SetFirstMessageMs(caChannelStats->FirstMessageTs.MilliSeconds());
+ protoOutputChannelStats.SetLastMessageMs(caChannelStats->LastMessageTs.MilliSeconds());
}
if (auto* outputInfo = OutputChannelsMap.FindPtr(protoOutputChannelStats.GetChannelId())) {
- if (auto *x = outputInfo->Stats.Get()) {
- protoOutputChannelStats.SetBlockedByCapacity(x->BlockedByCapacity);
- protoOutputChannelStats.SetNoDstActorId(x->NoDstActorId);
+ protoOutputChannelStats.SetDstStageId(outputInfo->DstStageId);
+ if (auto *outputChannelStats = outputInfo->Stats.Get()) {
+ protoOutputChannelStats.SetBlockedTimeUs(outputChannelStats->BlockedTime.MicroSeconds());
+ protoOutputChannelStats.SetBlockedByCapacity(outputChannelStats->BlockedByCapacity);
+ protoOutputChannelStats.SetNoDstActorId(outputChannelStats->NoDstActorId);
}
}
}
@@ -2060,6 +2092,8 @@ public:
if (last && MemoryQuota) {
MemoryQuota->ResetProfileStats();
}
+
+ // Cerr << "STAAT\n" << dst->DebugString() << Endl;
}
protected:
diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto
index d47176b2a49..a8ac09a2106 100644
--- a/ydb/library/yql/dq/actors/protos/dq_stats.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto
@@ -33,6 +33,7 @@ message TDqAsyncInputBufferStats {
message TDqInputChannelStats {
// basic stats
uint64 ChannelId = 1;
+ uint32 SrcStageId = 20;
uint64 Chunks = 2;
uint64 Bytes = 3;
uint64 RowsIn = 4;
@@ -44,7 +45,11 @@ message TDqInputChannelStats {
uint32 PollRequests = 8;
uint32 ResentMessages = 9;
- uint64 WaitTimeUs = 10;
+ uint64 IdleTimeUs = 11; // wait time until 1st message received
+ uint64 WaitTimeUs = 10; // wait time after 1st message received
+
+ uint64 FirstMessageMs = 30;
+ uint64 LastMessageMs = 31;
google.protobuf.Any Extra = 100;
}
@@ -69,6 +74,7 @@ message TDqAsyncOutputBufferStats {
message TDqOutputChannelStats {
// basic stats
uint64 ChannelId = 1;
+ uint32 DstStageId = 20;
uint64 Chunks = 2;
uint64 Bytes = 3;
uint64 RowsIn = 4;
@@ -86,6 +92,11 @@ message TDqOutputChannelStats {
uint64 SpilledRows = 13;
uint64 SpilledBlobs = 14;
+ uint64 BlockedTimeUs = 15;
+
+ uint64 FirstMessageMs = 30;
+ uint64 LastMessageMs = 31;
+
google.protobuf.Any Extra = 100;
}
diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto
index da6b4e5df06..bad0b318533 100644
--- a/ydb/library/yql/dq/proto/dq_tasks.proto
+++ b/ydb/library/yql/dq/proto/dq_tasks.proto
@@ -72,6 +72,8 @@ message TChannel {
uint64 Id = 1;
EDataTransportVersion TransportVersion = 2;
+ uint32 SrcStageId = 11;
+ uint32 DstStageId = 12;
uint64 SrcTaskId = 3;
uint64 DstTaskId = 4;
TEndpoint SrcEndpoint = 5;
diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h
index 14c4f871f31..7401576c613 100644
--- a/ydb/library/yql/dq/tasks/dq_connection_builder.h
+++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h
@@ -59,8 +59,10 @@ void BuildUnionAllChannels(TGraph& graph, const typename TGraph::TStageInfoType&
auto& originTask = graph.GetTask(originTaskId);
auto& channel = graph.AddChannel();
+ channel.SrcStageId = inputStageInfo.Id;
channel.SrcTask = originTaskId;
channel.SrcOutputIndex = outputIndex;
+ channel.DstStageId = stageInfo.Id;
channel.DstTask = targetTask.Id;
channel.DstInputIndex = inputIndex;
channel.InMemory = !enableSpilling || inputStageInfo.OutputsCount == 1;
@@ -108,8 +110,10 @@ void BuildHashShuffleChannels(TGraph& graph, const typename TGraph::TStageInfoTy
auto& targetTask = graph.GetTask(targetTaskId);
auto& channel = graph.AddChannel();
+ channel.SrcStageId = inputStageInfo.Id;
channel.SrcTask = originTask.Id;
channel.SrcOutputIndex = outputIndex;
+ channel.DstStageId = stageInfo.Id;
channel.DstTask = targetTask.Id;
channel.DstInputIndex = inputIndex;
channel.InMemory = !enableSpilling || inputStageInfo.OutputsCount == 1;
@@ -156,8 +160,10 @@ void BuildMapChannels(TGraph& graph, const typename TGraph::TStageInfoType& stag
auto targetTaskId = targetTasks[i];
auto& channel = graph.AddChannel();
+ channel.SrcStageId = inputStageInfo.Id;
channel.SrcTask = originTaskId;
channel.SrcOutputIndex = outputIndex;
+ channel.DstStageId = stageInfo.Id;
channel.DstTask = targetTaskId;
channel.DstInputIndex = inputIndex;
channel.InMemory = !enableSpilling || inputStageInfo.OutputsCount == 1;
@@ -207,8 +213,10 @@ void BuildBroadcastChannels(TGraph& graph, const typename TGraph::TStageInfoType
auto targetTaskId = targetTasks[i];
auto& channel = graph.AddChannel();
+ channel.SrcStageId = inputStageInfo.Id;
channel.SrcTask = originTaskId;
channel.SrcOutputIndex = outputIndex;
+ channel.DstStageId = stageInfo.Id;
channel.DstTask = targetTaskId;
channel.DstInputIndex = inputIndex;
channel.InMemory = !enableSpilling || inputStageInfo.OutputsCount == 1;
@@ -248,8 +256,10 @@ void BuildMergeChannels(TGraph& graph, const typename TGraph::TStageInfoType& st
auto& originTask = graph.GetTask(originTaskId);
auto& channel = graph.AddChannel();
+ channel.SrcStageId = inputStageInfo.Id;
channel.SrcTask = originTaskId;
channel.SrcOutputIndex = outputIndex;
+ channel.DstStageId = stageInfo.Id;
channel.DstTask = targetTask.Id;
channel.DstInputIndex = inputIndex;
channel.InMemory = true;
diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
index ca5ddbe86b7..4d6aeb1d94c 100644
--- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h
+++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
@@ -15,6 +15,7 @@ struct TStageId {
ui64 TxId = 0;
ui64 StageId = 0;
+ TStageId() = default;
TStageId(ui64 txId, ui64 stageId)
: TxId(txId)
, StageId(stageId) {}
@@ -79,13 +80,17 @@ struct TStageInfo : private TMoveOnly {
struct TChannel {
ui64 Id = 0;
+ TStageId SrcStageId;
ui64 SrcTask = 0;
ui32 SrcOutputIndex = 0;
+ TStageId DstStageId;
ui64 DstTask = 0;
ui32 DstInputIndex = 0;
bool InMemory = true;
NDqProto::ECheckpointingMode CheckpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT;
NDqProto::EWatermarksMode WatermarksMode = NDqProto::WATERMARKS_MODE_DISABLED;
+
+ TChannel() = default;
};
using TChannelList = TVector<ui64>;
@@ -280,7 +285,8 @@ public:
}
TChannel& AddChannel() {
- TChannel channel{Channels.size() + 1};
+ TChannel channel;
+ channel.Id = Channels.size() + 1;
return Channels.emplace_back(channel);
}
diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
index 667f4197d5f..d032d2fd7d8 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h
+++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
@@ -295,9 +295,9 @@ private:
auto& counter = *ServiceCounters.PublicCounters->GetNamedCounter("name", publicCounterName, isDeriv);
if (name == "MultiHop_LateThrownEventsCount") {
// the only incremental sensor from TaskRunner
- counter += v.Count;
+ counter += v.Sum;
} else {
- counter = v.Count;
+ counter = v.Sum;
}
}
}
@@ -409,6 +409,7 @@ private:
for (const auto& stats : s.GetInputChannels()) {
auto labels = commonLabels;
labels["InputChannel"] = ToString(stats.GetChannelId());
+ labels["SrcStageId"] = ToString(stats.GetSrcStageId());
ADD_COUNTER(Chunks);
ADD_COUNTER(Bytes);
@@ -417,6 +418,18 @@ private:
ADD_COUNTER(MaxMemoryUsage);
ADD_COUNTER(DeserializationTimeUs);
+ ADD_COUNTER(IdleTimeUs);
+ ADD_COUNTER(WaitTimeUs);
+ ADD_COUNTER(FirstMessageMs);
+ ADD_COUNTER(LastMessageMs);
+
+ if (stats.GetFirstMessageMs() && stats.GetLastMessageMs()) {
+ TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "ActiveTimeUs"),
+ ( TInstant::MilliSeconds(stats.GetLastMessageMs()) -
+ TInstant::MilliSeconds(stats.GetFirstMessageMs()) ).MicroSeconds()
+ );
+ }
+
// if (stats.GetFinishTs() >= stats.GetStartTs()) {
// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
// }
@@ -425,6 +438,7 @@ private:
for (const auto& stats : s.GetOutputChannels()) {
auto labels = commonLabels;
labels["OutputChannel"] = ToString(stats.GetChannelId());
+ labels["DstStageId"] = ToString(stats.GetDstStageId());
ADD_COUNTER(Chunks)
ADD_COUNTER(Bytes);
@@ -439,6 +453,17 @@ private:
ADD_COUNTER(SpilledRows);
ADD_COUNTER(SpilledBlobs);
+ ADD_COUNTER(BlockedTimeUs);
+ ADD_COUNTER(FirstMessageMs);
+ ADD_COUNTER(LastMessageMs);
+
+ if (stats.GetFirstMessageMs() && stats.GetLastMessageMs()) {
+ TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "ActiveTimeUs"),
+ ( TInstant::MilliSeconds(stats.GetLastMessageMs()) -
+ TInstant::MilliSeconds(stats.GetFirstMessageMs()) ).MicroSeconds()
+ );
+ }
+
// if (stats.GetFinishTs() >= stats.GetStartTs()) {
// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
// }
diff --git a/ydb/library/yql/providers/dq/counters/counters.cpp b/ydb/library/yql/providers/dq/counters/counters.cpp
index 87b39735d0f..df3171b0043 100644
--- a/ydb/library/yql/providers/dq/counters/counters.cpp
+++ b/ydb/library/yql/providers/dq/counters/counters.cpp
@@ -49,8 +49,6 @@ TCounters AggregateQueryStatsByStage(TCounters& queryStat, const THashMap<ui64,
std::map<TString, TString> labels;
TString prefix, name;
if (k.StartsWith("TaskRunner") && NCommon::ParseCounterName(&prefix, &labels, &name, k)) {
- auto maybeInputChannel = labels.find("InputChannel");
- auto maybeOutputChannel = labels.find("OutputChannel");
auto maybeTask = labels.find("Task");
if (maybeTask == labels.end()) {
aggregatedQueryStat.AddCounter(k, v);
@@ -65,29 +63,56 @@ TCounters AggregateQueryStatsByStage(TCounters& queryStat, const THashMap<ui64,
? "0"
: ToString(maybeStage->second);
ui64 channelId;
+ bool input = false;
+ bool output = false;
+ auto maybeInputChannel = labels.find("InputChannel");
if (maybeInputChannel != labels.end()) {
if (!TryFromString(maybeInputChannel->second, channelId)) {
continue;
}
+ ui32 stage = 0;
+ auto maybeSrcStageId = labels.find("SrcStageId");
+ if (maybeSrcStageId != labels.end()) {
+ TryFromString(maybeSrcStageId->second, stage);
+ labels.erase(maybeSrcStageId);
+ }
stage2Input[stageId].insert(channelId);
stage2Input["Total"].insert(channelId);
labels.erase(maybeInputChannel);
- labels["Input"] = "1";
+ labels["Input"] = ToString(stage);
+ input = true;
}
+ auto maybeOutputChannel = labels.find("OutputChannel");
if (maybeOutputChannel != labels.end()) {
if (!TryFromString(maybeOutputChannel->second, channelId)) {
continue;
}
+ ui32 stage = 0;
+ auto maybeDstStageId = labels.find("DstStageId");
+ if (maybeDstStageId != labels.end()) {
+ TryFromString(maybeDstStageId->second, stage);
+ labels.erase(maybeDstStageId);
+ }
stage2Output[stageId].insert(channelId);
stage2Output["Total"].insert(channelId);
labels.erase(maybeOutputChannel);
- labels["Output"] = "1";
+ labels["Output"] = ToString(stage);
+ output = true;
}
labels.erase(maybeTask);
labels["Stage"] = ToString(stageId);
stage2Tasks[stageId].insert(taskId);
stage2Tasks["Total"].insert(taskId);
aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", labels, name), v);
+ if (input || output) {
+ if (input) {
+ labels["Input"] = "Total";
+ }
+ if (output) {
+ labels["Output"] = "Total";
+ }
+ aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", labels, name), v);
+ }
labels["Stage"] = "Total";
aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", labels, name), v);
} else {
@@ -124,11 +149,11 @@ TCounters AggregateQueryStatsByStage(TCounters& queryStat, const THashMap<ui64,
}
for (const auto& [stageId, v] : stage2Input) {
aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner",
- {{"Stage", stageId},{"Input", "1"}}, "ChannelsCount"), static_cast<ui64>(v.size()));
+ {{"Stage", stageId},{"Input", "Total"}}, "ChannelsCount"), static_cast<ui64>(v.size()));
}
- for (const auto& [stageId, v] : stage2Input) {
+ for (const auto& [stageId, v] : stage2Output) {
aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner",
- {{"Stage", stageId},{"Output", "1"}}, "ChannelsCount"), static_cast<ui64>(v.size()));
+ {{"Stage", stageId},{"Output", "Total"}}, "ChannelsCount"), static_cast<ui64>(v.size()));
}
aggregatedQueryStat.AddCounter("StagesCount", static_cast<ui64>(stage2Tasks.size()));
diff --git a/ydb/library/yql/providers/dq/counters/counters.h b/ydb/library/yql/providers/dq/counters/counters.h
index ec05baa82da..de37fc7a317 100644
--- a/ydb/library/yql/providers/dq/counters/counters.h
+++ b/ydb/library/yql/providers/dq/counters/counters.h
@@ -56,14 +56,12 @@ struct TCounters {
template<typename T>
void AddCounter(const TString& name, T value) const {
- auto& counter = Counters[name];
- counter.Count += value;
+ Counters[name].Add(TEntry(value));
}
template<typename T>
void SetCounter(const TString& name, T value) const {
- auto& counter = Counters[name];
- counter.Count = value;
+ Counters[name] = TEntry(value);
}
THashMap<i64, ui64>& GetHistogram(const TString& name) {
@@ -142,6 +140,25 @@ struct TCounters {
i64 Min = 0;
i64 Avg = 0;
i64 Count = 0;
+
+ TEntry() = default;
+ explicit TEntry(i64 value) {
+ Sum = value;
+ Max = value;
+ Min = value;
+ Avg = value;
+ Count = 1;
+ }
+
+ void Add(const TEntry& entry) {
+ if (entry.Count) {
+ Sum += entry.Sum;
+ Min = (Count == 0) ? entry.Min : ::Min(Min, entry.Min);
+ Max = (Count == 0) ? entry.Max : ::Max(Max, entry.Max);
+ Count += entry.Count;
+ Avg = Sum / Count;
+ }
+ }
};
struct TCounterBlock {
@@ -171,18 +188,7 @@ struct TCounters {
}
void AddCounter(const TString& name, const TEntry& value) const {
- auto& counter = Counters[name];
- if (value.Count) {
- counter.Sum += value.Sum;
- counter.Min = counter.Count == 0
- ? value.Min
- : Min(counter.Min, value.Min);
- counter.Max = counter.Count == 0
- ? value.Max
- : Max(counter.Max, value.Max);
- counter.Count += value.Count;
- counter.Avg = counter.Sum / counter.Count;
- }
+ Counters[name].Add(value);
}
void Clear() const {
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index dbec17dec57..89d7cc995bc 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -407,7 +407,7 @@ namespace NYql::NDqs {
tasks[i].ComputeActorId = workers[i];
}
- THashMap<TStageId, std::tuple<TString, ui64, ui64>> stagePrograms = BuildAllPrograms();
+ BuildAllPrograms();
TVector<TDqTask> plan;
THashSet<TString> clusterNameHints;
for (const auto& task : tasks) {
@@ -450,7 +450,7 @@ namespace NYql::NDqs {
program.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);
TString programStr;
ui64 stageId, publicId;
- std::tie(programStr, stageId, publicId) = stagePrograms[task.StageId];
+ std::tie(programStr, stageId, publicId) = StagePrograms[task.StageId];
program.SetRaw(programStr);
taskMeta.SetStageId(publicId);
taskDesc.MutableMeta()->PackFrom(taskMeta);
@@ -589,10 +589,11 @@ namespace NYql::NDqs {
#undef BUILD_CONNECTION
-THashMap<TStageId, std::tuple<TString,ui64,ui64>> TDqsExecutionPlanner::BuildAllPrograms() {
+void TDqsExecutionPlanner::BuildAllPrograms() {
using namespace NKikimr::NMiniKQL;
- THashMap<TStageId, std::tuple<TString,ui64,ui64>> result;
+ StagePrograms.clear();
+
TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), FunctionRegistry->SupportsSizedAllocators());
TTypeEnvironment typeEnv(alloc);
TVector<NNodes::TExprBase> fakeReads;
@@ -623,18 +624,18 @@ THashMap<TStageId, std::tuple<TString,ui64,ui64>> TDqsExecutionPlanner::BuildAll
Y_VERIFY(false);
}
*/
- result[stageInfo.first] = std::make_tuple(
+ StagePrograms[stageInfo.first] = std::make_tuple(
NDq::BuildProgram(
stage.Program(), *paramsType, compiler, typeEnv, *FunctionRegistry,
ExprContext, fakeReads),
stageId, publicId);
}
-
- return result;
}
void TDqsExecutionPlanner::FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel) {
channelDesc.SetId(channel.Id);
+ channelDesc.SetSrcStageId(std::get<2>(StagePrograms[channel.SrcStageId]));
+ channelDesc.SetDstStageId(std::get<2>(StagePrograms[channel.DstStageId]));
channelDesc.SetSrcTaskId(channel.SrcTask);
channelDesc.SetDstTaskId(channel.DstTask);
channelDesc.SetCheckpointingMode(channel.CheckpointingMode);
@@ -785,6 +786,7 @@ THashMap<TStageId, std::tuple<TString,ui64,ui64>> TDqsExecutionPlanner::BuildAll
auto channelDesc = outputDesc->AddChannels();
channelDesc->SetId(1);
+ channelDesc->SetSrcStageId(1);
channelDesc->SetSrcTaskId(2);
channelDesc->SetDstTaskId(1);
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.h b/ydb/library/yql/providers/dq/planner/execution_planner.h
index cacb6ea2c71..4c3abee30fd 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.h
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.h
@@ -67,7 +67,7 @@ namespace NYql::NDqs {
private:
bool BuildReadStage(const NNodes::TDqPhyStage& stage, bool dqSource, bool canFallback);
void BuildConnections(const NNodes::TDqPhyStage& stage);
- THashMap<NDq::TStageId, std::tuple<TString,ui64,ui64>> BuildAllPrograms();
+ void BuildAllPrograms();
void FillChannelDesc(NDqProto::TChannel& channelDesc, const NDq::TChannel& channel);
void FillInputDesc(NDqProto::TTaskInput& inputDesc, const TTaskInput& input);
void FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output);
@@ -93,6 +93,7 @@ namespace NYql::NDqs {
TVector<NDqProto::TDqTask> Tasks;
THashMap<ui64, ui32> PublicIds;
+ THashMap<NDq::TStageId, std::tuple<TString,ui64,ui64>> StagePrograms;
};
// Execution planner for TRuntimeNode