aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@ydb.tech>2023-06-26 19:53:49 +0300
committeraozeritsky <aozeritsky@ydb.tech>2023-06-26 19:53:49 +0300
commita2346cb456fe01d95d8f02ee2db0b77ca2c427f6 (patch)
tree4a3231386fffd90c87447164e3187ade77e21ae3
parent6c579bbd8c764857a7a200b32e7d1f95738a667b (diff)
downloadydb-a2346cb456fe01d95d8f02ee2db0b77ca2c427f6.tar.gz
Disable channel counters by default
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller_impl.h111
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.cpp1
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h3
3 files changed, 57 insertions, 58 deletions
diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
index a982039833..a62e098de0 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h
+++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
@@ -409,67 +409,62 @@ private:
// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
// }
- for (const auto& stats : s.GetInputChannels()) {
- auto labels = commonLabels;
- labels["InputChannel"] = ToString(stats.GetChannelId());
- labels["SrcStageId"] = ToString(stats.GetSrcStageId());
-
- ADD_COUNTER(Chunks);
- ADD_COUNTER(Bytes);
- ADD_COUNTER(RowsIn);
- ADD_COUNTER(RowsOut);
- ADD_COUNTER(MaxMemoryUsage);
- ADD_COUNTER(DeserializationTimeUs);
-
- ADD_COUNTER(IdleTimeUs);
- ADD_COUNTER(WaitTimeUs);
- ADD_COUNTER(FirstMessageMs);
- ADD_COUNTER(LastMessageMs);
-
- if (stats.GetFirstMessageMs() && stats.GetLastMessageMs()) {
- TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "ActiveTimeUs"),
- ( TInstant::MilliSeconds(stats.GetLastMessageMs()) -
- TInstant::MilliSeconds(stats.GetFirstMessageMs()) ).MicroSeconds()
- );
+ if (Settings->EnableChannelStats.Get().GetOrElse(TDqSettings::TDefault::EnableChannelStats))
+ {
+ for (const auto& stats : s.GetInputChannels()) {
+ auto labels = commonLabels;
+ labels["InputChannel"] = ToString(stats.GetChannelId());
+ labels["SrcStageId"] = ToString(stats.GetSrcStageId());
+
+ ADD_COUNTER(Chunks);
+ ADD_COUNTER(Bytes);
+ ADD_COUNTER(RowsIn);
+ ADD_COUNTER(RowsOut);
+ ADD_COUNTER(MaxMemoryUsage);
+ ADD_COUNTER(DeserializationTimeUs);
+
+ ADD_COUNTER(IdleTimeUs);
+ ADD_COUNTER(WaitTimeUs);
+ ADD_COUNTER(FirstMessageMs);
+ ADD_COUNTER(LastMessageMs);
+
+ if (stats.GetFirstMessageMs() && stats.GetLastMessageMs()) {
+ TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "ActiveTimeUs"),
+ ( TInstant::MilliSeconds(stats.GetLastMessageMs()) -
+ TInstant::MilliSeconds(stats.GetFirstMessageMs()) ).MicroSeconds()
+ );
+ }
}
-// if (stats.GetFinishTs() >= stats.GetStartTs()) {
-// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
-// }
- }
-
- for (const auto& stats : s.GetOutputChannels()) {
- auto labels = commonLabels;
- labels["OutputChannel"] = ToString(stats.GetChannelId());
- labels["DstStageId"] = ToString(stats.GetDstStageId());
-
- ADD_COUNTER(Chunks)
- ADD_COUNTER(Bytes);
- ADD_COUNTER(RowsIn);
- ADD_COUNTER(RowsOut);
- ADD_COUNTER(MaxMemoryUsage);
-
- ADD_COUNTER(SerializationTimeUs);
- ADD_COUNTER(BlockedByCapacity);
-
- ADD_COUNTER(SpilledBytes);
- ADD_COUNTER(SpilledRows);
- ADD_COUNTER(SpilledBlobs);
-
- ADD_COUNTER(BlockedTimeUs);
- ADD_COUNTER(FirstMessageMs);
- ADD_COUNTER(LastMessageMs);
-
- if (stats.GetFirstMessageMs() && stats.GetLastMessageMs()) {
- TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "ActiveTimeUs"),
- ( TInstant::MilliSeconds(stats.GetLastMessageMs()) -
- TInstant::MilliSeconds(stats.GetFirstMessageMs()) ).MicroSeconds()
- );
+ for (const auto& stats : s.GetOutputChannels()) {
+ auto labels = commonLabels;
+ labels["OutputChannel"] = ToString(stats.GetChannelId());
+ labels["DstStageId"] = ToString(stats.GetDstStageId());
+
+ ADD_COUNTER(Chunks)
+ ADD_COUNTER(Bytes);
+ ADD_COUNTER(RowsIn);
+ ADD_COUNTER(RowsOut);
+ ADD_COUNTER(MaxMemoryUsage);
+
+ ADD_COUNTER(SerializationTimeUs);
+ ADD_COUNTER(BlockedByCapacity);
+
+ ADD_COUNTER(SpilledBytes);
+ ADD_COUNTER(SpilledRows);
+ ADD_COUNTER(SpilledBlobs);
+
+ ADD_COUNTER(BlockedTimeUs);
+ ADD_COUNTER(FirstMessageMs);
+ ADD_COUNTER(LastMessageMs);
+
+ if (stats.GetFirstMessageMs() && stats.GetLastMessageMs()) {
+ TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "ActiveTimeUs"),
+ ( TInstant::MilliSeconds(stats.GetLastMessageMs()) -
+ TInstant::MilliSeconds(stats.GetFirstMessageMs()) ).MicroSeconds()
+ );
+ }
}
-
-// if (stats.GetFinishTs() >= stats.GetStartTs()) {
-// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
-// }
}
for (const auto& stats : s.GetSources()) {
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
index 5216f36477..30cd35eaa3 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
@@ -63,6 +63,7 @@ TDqConfiguration::TDqConfiguration() {
REGISTER_SETTING(*this, UseFastPickleTransport);
REGISTER_SETTING(*this, AggregateStatsByStage);
+ REGISTER_SETTING(*this, EnableChannelStats);
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index 2474fd9a4c..76d12dd083 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -38,6 +38,7 @@ struct TDqSettings {
static constexpr ui32 HashShuffleMaxTasks = 24;
static constexpr bool UseFastPickleTransport = false;
static constexpr bool AggregateStatsByStage = true;
+ static constexpr bool EnableChannelStats = false;
};
using TPtr = std::shared_ptr<TDqSettings>;
@@ -98,6 +99,7 @@ struct TDqSettings {
NCommon::TConfSetting<bool, false> UseFastPickleTransport;
NCommon::TConfSetting<bool, false> AggregateStatsByStage;
+ NCommon::TConfSetting<bool, false> EnableChannelStats;
// This options will be passed to executor_actor and worker_actor
template <typename TProtoConfig>
@@ -147,6 +149,7 @@ struct TDqSettings {
SAVE_SETTING(UseWideChannels);
SAVE_SETTING(UseFastPickleTransport);
SAVE_SETTING(AggregateStatsByStage);
+ SAVE_SETTING(EnableChannelStats);
#undef SAVE_SETTING
}