diff options
author | aozeritsky <aozeritsky@ydb.tech> | 2023-06-01 16:18:39 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@ydb.tech> | 2023-06-01 16:18:39 +0300 |
commit | 308587966b506c17c0a15004ee2181bfc4a5cb37 (patch) | |
tree | d9e956819653daf6b9ada27276b2d3b9133fe660 | |
parent | ee5d45522deb27bb93f485b36b8c066e004b8cc6 (diff) | |
download | ydb-308587966b506c17c0a15004ee2181bfc4a5cb37.tar.gz |
Can collect raw statistics in dq provider
4 files changed, 90 insertions, 63 deletions
diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h index bb1458a7bc7..667f4197d5f 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h +++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h @@ -66,7 +66,11 @@ public: , Settings(settings) , ServiceCounters(serviceCounters, "task_controller") , PingPeriod(pingPeriod) - , AggrPeriod(aggrPeriod) + , AggrPeriod( + settings->AggregateStatsByStage.Get().GetOrElse(TDqSettings::TDefault::AggregateStatsByStage) + ? aggrPeriod + : TDuration::Zero() + ) , Issues(CreateDefaultTimeProvider()) { if (Settings) { @@ -333,16 +337,20 @@ private: YQL_ENSURE(x.GetTasks().size() == 1); auto& s = x.GetTasks(0); ui64 taskId = s.GetTaskId(); + ui64 stageId = s.GetStageId(); #define ADD_COUNTER(name) \ if (stats.Get ## name()) { \ TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, #name), stats.Get ## name ()); \ } - std::map<TString, TString> labels = { - {"Task", ToString(taskId)} + std::map<TString, TString> commonLabels = { + {"Task", ToString(taskId)}, + {"Stage", ToString(stageId)} }; + auto labels = commonLabels; + auto& stats = s; // basic stats ADD_COUNTER(CpuTimeUs) @@ -399,10 +407,8 @@ private: // } for (const auto& stats : s.GetInputChannels()) { - std::map<TString, TString> labels = { - {"Task", ToString(taskId)}, - {"InputChannel", ToString(stats.GetChannelId())} - }; + auto labels = commonLabels; + labels["InputChannel"] = ToString(stats.GetChannelId()); ADD_COUNTER(Chunks); ADD_COUNTER(Bytes); @@ -417,10 +423,8 @@ private: } for (const auto& stats : s.GetOutputChannels()) { - std::map<TString, TString> labels = { - {"Task", ToString(taskId)}, - {"OutputChannel", ToString(stats.GetChannelId())} - }; + auto labels = commonLabels; + labels["OutputChannel"] = ToString(stats.GetChannelId()); ADD_COUNTER(Chunks) ADD_COUNTER(Bytes); @@ -441,10 +445,8 @@ private: } for (const auto& stats : s.GetSources()) { - std::map<TString, TString> labels = { - {"Task", ToString(taskId)}, - {"Source", ToString(stats.GetInputIndex())} - }; + auto labels = commonLabels; + labels["Source"] = ToString(stats.GetInputIndex()); ADD_COUNTER(Chunks); ADD_COUNTER(Bytes); @@ -461,10 +463,8 @@ private: } for (const auto& stats : s.GetSinks()) { - std::map<TString, TString> labels = { - {"Task", ToString(taskId)}, - {"Sink", ToString(stats.GetOutputIndex())} - }; + auto labels = commonLabels; + labels["Sink"] = ToString(stats.GetOutputIndex()); ADD_COUNTER(Chunks) ADD_COUNTER(Bytes); diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index 1370c4f7958..c6239bb8bb6 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -146,6 +146,7 @@ struct TDqSettings { SAVE_SETTING(HashShuffleMaxTasks); SAVE_SETTING(UseWideChannels); SAVE_SETTING(UseFastPickleTransport); + SAVE_SETTING(AggregateStatsByStage); #undef SAVE_SETTING } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp index 9b82ab1d0f7..5ab6e17db64 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp @@ -130,7 +130,7 @@ Y_UNIT_TEST(CollectTaskRunnerStatisticsByTask) { TCounters::GetCounterName( "Prefix", { - {"Input", "2"}, + {"InputChannel", "2"}, {"Stage", "10"}, {"Task", "1"}, }, @@ -144,7 +144,7 @@ Y_UNIT_TEST(CollectTaskRunnerStatisticsByTask) { TCounters::GetCounterName( "Prefix", { - {"Output", "3"}, + {"OutputChannel", "3"}, {"Stage", "10"}, {"Task", "1"}, }, @@ -172,32 +172,34 @@ Y_UNIT_TEST(CollectTaskRunnerStatisticsByTask) { CollectTaskRunnerStatisticsByTask(writer, taskRunner); } TString expected = R"__({ - "Task=1" = { - "Input=2" = { - "Counter1" = { - "sum" = 1; - "count" = 5; - "avg" = 4; - "max" = 2; - "min" = 3 - } - }; - "Output=3" = { - "Counter2" = { - "sum" = 1; - "count" = 5; - "avg" = 4; - "max" = 2; - "min" = 3 - } - }; - "Generic" = { - "Counter3" = { - "sum" = 1; - "count" = 5; - "avg" = 4; - "max" = 2; - "min" = 3 + "Stage=10" = { + "Task=1" = { + "Input=2" = { + "Counter1" = { + "sum" = 1; + "count" = 5; + "avg" = 4; + "max" = 2; + "min" = 3 + } + }; + "Output=3" = { + "Counter2" = { + "sum" = 1; + "count" = 5; + "avg" = 4; + "max" = 2; + "min" = 3 + } + }; + "Generic" = { + "Counter3" = { + "sum" = 1; + "count" = 5; + "avg" = 4; + "max" = 2; + "min" = 3 + } } } } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp index a78ff6ab93e..87610336f1e 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp @@ -10,9 +10,19 @@ namespace { struct TCountersByTask { THashMap<TString, TOperationStatistics> Inputs; THashMap<TString, TOperationStatistics> Outputs; + THashMap<TString, TOperationStatistics> Sources; + THashMap<TString, TOperationStatistics> Sinks; TOperationStatistics Generic; }; +void WriteEntries(NYson::TYsonWriter& writer, TStringBuf prefix, const THashMap<TString, TOperationStatistics>& data) { + + for (const auto& [k, v] : data) { + writer.OnKeyedItem(prefix + k); + NCommon::WriteStatistics(writer, v); + } +} + } // namespace void CollectTaskRunnerStatisticsByStage(NYson::TYsonWriter& writer, const TOperationStatistics& taskRunner, bool totalOnly) @@ -78,7 +88,7 @@ void CollectTaskRunnerStatisticsByStage(NYson::TYsonWriter& writer, const TOpera void CollectTaskRunnerStatisticsByTask(NYson::TYsonWriter& writer, const TOperationStatistics& taskRunner) { - THashMap<TString, TCountersByTask> countersByTask; + THashMap<TString, THashMap<TString, TCountersByTask>> countersByStage; for (const auto& entry : taskRunner.Entries) { TString prefix, name; @@ -86,15 +96,22 @@ void CollectTaskRunnerStatisticsByTask(NYson::TYsonWriter& writer, const TOperat if (!NCommon::ParseCounterName(&prefix, &labels, &name, entry.Name)) { continue; } - auto maybeInput = labels.find("Input"); - auto maybeOutput = labels.find("Output"); + auto maybeInput = labels.find("InputChannel"); + auto maybeOutput = labels.find("OutputChannel"); + auto maybeSource = labels.find("Source"); + auto maybeSink = labels.find("Sink"); auto maybeTask = labels.find("Task"); + auto maybeStage = labels.find("Stage"); if (maybeTask == labels.end()) { continue; } + if (maybeStage == labels.end()) { + continue; + } auto newEntry = entry; newEntry.Name = name; + auto& countersByTask = countersByStage[maybeStage->second]; auto& counters = countersByTask[maybeTask->second]; if (maybeInput != labels.end()) { counters.Inputs[maybeInput->second].Entries.emplace_back(newEntry); @@ -102,6 +119,12 @@ void CollectTaskRunnerStatisticsByTask(NYson::TYsonWriter& writer, const TOperat if (maybeOutput != labels.end()) { counters.Outputs[maybeOutput->second].Entries.emplace_back(newEntry); } + if (maybeSource != labels.end()) { + counters.Sources[maybeSource->second].Entries.emplace_back(newEntry); + } + if (maybeSink != labels.end()) { + counters.Sinks[maybeSink->second].Entries.emplace_back(newEntry); + } if (maybeInput == labels.end() && maybeOutput == labels.end()) { counters.Generic.Entries.emplace_back(newEntry); } @@ -109,24 +132,25 @@ void CollectTaskRunnerStatisticsByTask(NYson::TYsonWriter& writer, const TOperat writer.OnBeginMap(); - for (const auto& [task, counters] : countersByTask) { - writer.OnKeyedItem("Task=" + task); - + for (const auto& [stage, countersByTask] : countersByStage) { + writer.OnKeyedItem("Stage=" + stage); writer.OnBeginMap(); - for (const auto& [input, stat] : counters.Inputs) { - writer.OnKeyedItem("Input=" + input); - NCommon::WriteStatistics(writer, stat); - } + for (const auto& [task, counters] : countersByTask) { + writer.OnKeyedItem("Task=" + task); - for (const auto& [output, stat] : counters.Outputs) { - writer.OnKeyedItem("Output=" + output); - NCommon::WriteStatistics(writer, stat); - } + writer.OnBeginMap(); + + WriteEntries(writer, "Input=", counters.Inputs); + WriteEntries(writer, "Output=", counters.Outputs); + WriteEntries(writer, "Source=", counters.Sources); + WriteEntries(writer, "Sink=", counters.Sinks); - writer.OnKeyedItem("Generic"); - NCommon::WriteStatistics(writer, counters.Generic); + writer.OnKeyedItem("Generic"); + NCommon::WriteStatistics(writer, counters.Generic); + writer.OnEndMap(); + } writer.OnEndMap(); } |