diff options
author | hor911 <hor911@ydb.tech> | 2023-11-30 13:11:34 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-11-30 14:23:47 +0300 |
commit | 154baef288889ebcf43e5fbfb46f57003440bab8 (patch) | |
tree | 1a388f954ecfb90be8efcf96138652e422814e20 | |
parent | a0a9432fa6ffe0a03cb07a06898024529b3f580f (diff) | |
download | ydb-154baef288889ebcf43e5fbfb46f57003440bab8.tar.gz |
Source/Sink in by Stage DQRUN Aggregated Stats
-rw-r--r-- | ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp | 6 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp | 22 |
2 files changed, 27 insertions, 1 deletions
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp index 5ab6e17db6..339f7299d3 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp @@ -106,6 +106,12 @@ Y_UNIT_TEST(CollectTaskRunnerStatisticsByStage) { } } }; + "Source" = { + "total" = {} + }; + "Sink" = { + "total" = {} + }; "Task" = { "total" = { "Counter3" = { diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp index 87610336f1..ffa849a8b1 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp @@ -30,6 +30,8 @@ void CollectTaskRunnerStatisticsByStage(NYson::TYsonWriter& writer, const TOpera THashMap<TString, TOperationStatistics> taskRunnerStage; THashMap<TString, TOperationStatistics> taskRunnerInput; THashMap<TString, TOperationStatistics> taskRunnerOutput; + THashMap<TString, TOperationStatistics> taskRunnerSource; + THashMap<TString, TOperationStatistics> taskRunnerSink; for (const auto& entry : taskRunner.Entries) { TString prefix, name; @@ -39,6 +41,8 @@ void CollectTaskRunnerStatisticsByStage(NYson::TYsonWriter& writer, const TOpera } auto maybeInput = labels.find("Input"); auto maybeOutput = labels.find("Output"); + auto maybeSource = labels.find("Source"); + auto maybeSink = labels.find("Sink"); auto maybeStage = labels.find("Stage"); if (maybeStage == labels.end()) { maybeStage = labels.find("Task"); @@ -55,7 +59,15 @@ void CollectTaskRunnerStatisticsByStage(NYson::TYsonWriter& writer, const TOpera auto newEntry = entry; newEntry.Name = name; taskRunnerOutput[maybeStage->second].Entries.push_back(newEntry); } - if (maybeInput == labels.end() && maybeOutput == labels.end()) { + if (maybeSource != labels.end()) { + auto newEntry = entry; newEntry.Name = name; + taskRunnerSource[maybeStage->second].Entries.push_back(newEntry); + } + if (maybeSink != labels.end()) { + auto newEntry = entry; newEntry.Name = name; + taskRunnerSink[maybeStage->second].Entries.push_back(newEntry); + } + if (maybeInput == labels.end() && maybeOutput == labels.end() && maybeSource == labels.end() && maybeSink == labels.end()) { auto newEntry = entry; newEntry.Name = name; taskRunnerStage[maybeStage->second].Entries.push_back(newEntry); } @@ -65,6 +77,8 @@ void CollectTaskRunnerStatisticsByStage(NYson::TYsonWriter& writer, const TOpera for (const auto& [stageId, stat] : taskRunnerStage) { const auto& inputStat = taskRunnerInput[stageId]; const auto& outputStat = taskRunnerOutput[stageId]; + const auto& sourceStat = taskRunnerSource[stageId]; + const auto& sinkStat = taskRunnerSink[stageId]; writer.OnKeyedItem("Stage=" + stageId); { @@ -76,6 +90,12 @@ void CollectTaskRunnerStatisticsByStage(NYson::TYsonWriter& writer, const TOpera writer.OnKeyedItem("Output"); NCommon::WriteStatistics(writer, totalOnly, {{0, outputStat}}); + writer.OnKeyedItem("Source"); + NCommon::WriteStatistics(writer, totalOnly, {{0, sourceStat}}); + + writer.OnKeyedItem("Sink"); + NCommon::WriteStatistics(writer, totalOnly, {{0, sinkStat}}); + writer.OnKeyedItem("Task"); NCommon::WriteStatistics(writer, totalOnly, {{0, stat}}); |