aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@ydb.tech>2023-06-01 16:18:39 +0300
committeraozeritsky <aozeritsky@ydb.tech>2023-06-01 16:18:39 +0300
commit308587966b506c17c0a15004ee2181bfc4a5cb37 (patch)
treed9e956819653daf6b9ada27276b2d3b9133fe660
parentee5d45522deb27bb93f485b36b8c066e004b8cc6 (diff)
downloadydb-308587966b506c17c0a15004ee2181bfc4a5cb37.tar.gz
Can collect raw statistics in dq provider
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller_impl.h38
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h1
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp58
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp56
4 files changed, 90 insertions, 63 deletions
diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
index bb1458a7bc7..667f4197d5f 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h
+++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
@@ -66,7 +66,11 @@ public:
, Settings(settings)
, ServiceCounters(serviceCounters, "task_controller")
, PingPeriod(pingPeriod)
- , AggrPeriod(aggrPeriod)
+ , AggrPeriod(
+ settings->AggregateStatsByStage.Get().GetOrElse(TDqSettings::TDefault::AggregateStatsByStage)
+ ? aggrPeriod
+ : TDuration::Zero()
+ )
, Issues(CreateDefaultTimeProvider())
{
if (Settings) {
@@ -333,16 +337,20 @@ private:
YQL_ENSURE(x.GetTasks().size() == 1);
auto& s = x.GetTasks(0);
ui64 taskId = s.GetTaskId();
+ ui64 stageId = s.GetStageId();
#define ADD_COUNTER(name) \
if (stats.Get ## name()) { \
TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, #name), stats.Get ## name ()); \
}
- std::map<TString, TString> labels = {
- {"Task", ToString(taskId)}
+ std::map<TString, TString> commonLabels = {
+ {"Task", ToString(taskId)},
+ {"Stage", ToString(stageId)}
};
+ auto labels = commonLabels;
+
auto& stats = s;
// basic stats
ADD_COUNTER(CpuTimeUs)
@@ -399,10 +407,8 @@ private:
// }
for (const auto& stats : s.GetInputChannels()) {
- std::map<TString, TString> labels = {
- {"Task", ToString(taskId)},
- {"InputChannel", ToString(stats.GetChannelId())}
- };
+ auto labels = commonLabels;
+ labels["InputChannel"] = ToString(stats.GetChannelId());
ADD_COUNTER(Chunks);
ADD_COUNTER(Bytes);
@@ -417,10 +423,8 @@ private:
}
for (const auto& stats : s.GetOutputChannels()) {
- std::map<TString, TString> labels = {
- {"Task", ToString(taskId)},
- {"OutputChannel", ToString(stats.GetChannelId())}
- };
+ auto labels = commonLabels;
+ labels["OutputChannel"] = ToString(stats.GetChannelId());
ADD_COUNTER(Chunks)
ADD_COUNTER(Bytes);
@@ -441,10 +445,8 @@ private:
}
for (const auto& stats : s.GetSources()) {
- std::map<TString, TString> labels = {
- {"Task", ToString(taskId)},
- {"Source", ToString(stats.GetInputIndex())}
- };
+ auto labels = commonLabels;
+ labels["Source"] = ToString(stats.GetInputIndex());
ADD_COUNTER(Chunks);
ADD_COUNTER(Bytes);
@@ -461,10 +463,8 @@ private:
}
for (const auto& stats : s.GetSinks()) {
- std::map<TString, TString> labels = {
- {"Task", ToString(taskId)},
- {"Sink", ToString(stats.GetOutputIndex())}
- };
+ auto labels = commonLabels;
+ labels["Sink"] = ToString(stats.GetOutputIndex());
ADD_COUNTER(Chunks)
ADD_COUNTER(Bytes);
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index 1370c4f7958..c6239bb8bb6 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -146,6 +146,7 @@ struct TDqSettings {
SAVE_SETTING(HashShuffleMaxTasks);
SAVE_SETTING(UseWideChannels);
SAVE_SETTING(UseFastPickleTransport);
+ SAVE_SETTING(AggregateStatsByStage);
#undef SAVE_SETTING
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp
index 9b82ab1d0f7..5ab6e17db64 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider_ut.cpp
@@ -130,7 +130,7 @@ Y_UNIT_TEST(CollectTaskRunnerStatisticsByTask) {
TCounters::GetCounterName(
"Prefix",
{
- {"Input", "2"},
+ {"InputChannel", "2"},
{"Stage", "10"},
{"Task", "1"},
},
@@ -144,7 +144,7 @@ Y_UNIT_TEST(CollectTaskRunnerStatisticsByTask) {
TCounters::GetCounterName(
"Prefix",
{
- {"Output", "3"},
+ {"OutputChannel", "3"},
{"Stage", "10"},
{"Task", "1"},
},
@@ -172,32 +172,34 @@ Y_UNIT_TEST(CollectTaskRunnerStatisticsByTask) {
CollectTaskRunnerStatisticsByTask(writer, taskRunner);
}
TString expected = R"__({
- "Task=1" = {
- "Input=2" = {
- "Counter1" = {
- "sum" = 1;
- "count" = 5;
- "avg" = 4;
- "max" = 2;
- "min" = 3
- }
- };
- "Output=3" = {
- "Counter2" = {
- "sum" = 1;
- "count" = 5;
- "avg" = 4;
- "max" = 2;
- "min" = 3
- }
- };
- "Generic" = {
- "Counter3" = {
- "sum" = 1;
- "count" = 5;
- "avg" = 4;
- "max" = 2;
- "min" = 3
+ "Stage=10" = {
+ "Task=1" = {
+ "Input=2" = {
+ "Counter1" = {
+ "sum" = 1;
+ "count" = 5;
+ "avg" = 4;
+ "max" = 2;
+ "min" = 3
+ }
+ };
+ "Output=3" = {
+ "Counter2" = {
+ "sum" = 1;
+ "count" = 5;
+ "avg" = 4;
+ "max" = 2;
+ "min" = 3
+ }
+ };
+ "Generic" = {
+ "Counter3" = {
+ "sum" = 1;
+ "count" = 5;
+ "avg" = 4;
+ "max" = 2;
+ "min" = 3
+ }
}
}
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp
index a78ff6ab93e..87610336f1e 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_statistics_json.cpp
@@ -10,9 +10,19 @@ namespace {
struct TCountersByTask {
THashMap<TString, TOperationStatistics> Inputs;
THashMap<TString, TOperationStatistics> Outputs;
+ THashMap<TString, TOperationStatistics> Sources;
+ THashMap<TString, TOperationStatistics> Sinks;
TOperationStatistics Generic;
};
+void WriteEntries(NYson::TYsonWriter& writer, TStringBuf prefix, const THashMap<TString, TOperationStatistics>& data) {
+
+ for (const auto& [k, v] : data) {
+ writer.OnKeyedItem(prefix + k);
+ NCommon::WriteStatistics(writer, v);
+ }
+}
+
} // namespace
void CollectTaskRunnerStatisticsByStage(NYson::TYsonWriter& writer, const TOperationStatistics& taskRunner, bool totalOnly)
@@ -78,7 +88,7 @@ void CollectTaskRunnerStatisticsByStage(NYson::TYsonWriter& writer, const TOpera
void CollectTaskRunnerStatisticsByTask(NYson::TYsonWriter& writer, const TOperationStatistics& taskRunner)
{
- THashMap<TString, TCountersByTask> countersByTask;
+ THashMap<TString, THashMap<TString, TCountersByTask>> countersByStage;
for (const auto& entry : taskRunner.Entries) {
TString prefix, name;
@@ -86,15 +96,22 @@ void CollectTaskRunnerStatisticsByTask(NYson::TYsonWriter& writer, const TOperat
if (!NCommon::ParseCounterName(&prefix, &labels, &name, entry.Name)) {
continue;
}
- auto maybeInput = labels.find("Input");
- auto maybeOutput = labels.find("Output");
+ auto maybeInput = labels.find("InputChannel");
+ auto maybeOutput = labels.find("OutputChannel");
+ auto maybeSource = labels.find("Source");
+ auto maybeSink = labels.find("Sink");
auto maybeTask = labels.find("Task");
+ auto maybeStage = labels.find("Stage");
if (maybeTask == labels.end()) {
continue;
}
+ if (maybeStage == labels.end()) {
+ continue;
+ }
auto newEntry = entry; newEntry.Name = name;
+ auto& countersByTask = countersByStage[maybeStage->second];
auto& counters = countersByTask[maybeTask->second];
if (maybeInput != labels.end()) {
counters.Inputs[maybeInput->second].Entries.emplace_back(newEntry);
@@ -102,6 +119,12 @@ void CollectTaskRunnerStatisticsByTask(NYson::TYsonWriter& writer, const TOperat
if (maybeOutput != labels.end()) {
counters.Outputs[maybeOutput->second].Entries.emplace_back(newEntry);
}
+ if (maybeSource != labels.end()) {
+ counters.Sources[maybeSource->second].Entries.emplace_back(newEntry);
+ }
+ if (maybeSink != labels.end()) {
+ counters.Sinks[maybeSink->second].Entries.emplace_back(newEntry);
+ }
if (maybeInput == labels.end() && maybeOutput == labels.end()) {
counters.Generic.Entries.emplace_back(newEntry);
}
@@ -109,24 +132,25 @@ void CollectTaskRunnerStatisticsByTask(NYson::TYsonWriter& writer, const TOperat
writer.OnBeginMap();
- for (const auto& [task, counters] : countersByTask) {
- writer.OnKeyedItem("Task=" + task);
-
+ for (const auto& [stage, countersByTask] : countersByStage) {
+ writer.OnKeyedItem("Stage=" + stage);
writer.OnBeginMap();
- for (const auto& [input, stat] : counters.Inputs) {
- writer.OnKeyedItem("Input=" + input);
- NCommon::WriteStatistics(writer, stat);
- }
+ for (const auto& [task, counters] : countersByTask) {
+ writer.OnKeyedItem("Task=" + task);
- for (const auto& [output, stat] : counters.Outputs) {
- writer.OnKeyedItem("Output=" + output);
- NCommon::WriteStatistics(writer, stat);
- }
+ writer.OnBeginMap();
+
+ WriteEntries(writer, "Input=", counters.Inputs);
+ WriteEntries(writer, "Output=", counters.Outputs);
+ WriteEntries(writer, "Source=", counters.Sources);
+ WriteEntries(writer, "Sink=", counters.Sinks);
- writer.OnKeyedItem("Generic");
- NCommon::WriteStatistics(writer, counters.Generic);
+ writer.OnKeyedItem("Generic");
+ NCommon::WriteStatistics(writer, counters.Generic);
+ writer.OnEndMap();
+ }
writer.OnEndMap();
}