aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@ydb.tech>2023-11-19 21:57:24 +0300
committeraozeritsky <aozeritsky@ydb.tech>2023-11-19 22:20:43 +0300
commitc0e09264adc48044d0fd225374245e9b06a4d78c (patch)
tree1b88ba485c4db554c3ae48566f8c6b61994ab149
parenta342b47017d029930f618e485b621baf68eab447 (diff)
downloadydb-c0e09264adc48044d0fd225374245e9b06a4d78c.tar.gz
Disable profile stats by default, disable incremental stats
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp48
1 files changed, 31 insertions, 17 deletions
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index 7811c075f7..06ca141aa7 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -141,15 +141,21 @@ public:
*Runner->GetStats(),
Runner->GetTaskId(), StageId);
- auto stats = Runner->GetStats();
- for (const auto& [stageId, stageChannels] : stats->OutputChannels) {
- for (const auto& [id, channel] : stageChannels) {
- UpdateOutputChannelStats(channel);
+ auto statsMode = DqConfiguration->TaskRunnerStats.Get().GetOrElse(TDqSettings::TDefault::TaskRunnerStats);
+ if (statsMode == TDqSettings::ETaskRunnerStats::Profile) {
+ auto stats = Runner->GetStats();
+ for (const auto& [stageId, stageChannels] : stats->OutputChannels) {
+ for (const auto& [id, channel] : stageChannels) {
+ UpdateOutputChannelStats(channel);
+ }
}
- }
- for (const auto& [stageId, stageChannels] : stats->InputChannels) {
- for (const auto& [id, channel] : stageChannels) {
- UpdateInputChannelStats(channel);
+ for (const auto& [stageId, stageChannels] : stats->InputChannels) {
+ for (const auto& [id, channel] : stageChannels) {
+ UpdateInputChannelStats(channel);
+ }
+ }
+ for (const auto& [sourceId, source] : stats->Sources) {
+ UpdateSourceStats(source);
}
}
}
@@ -345,7 +351,6 @@ public:
auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit
channel->Push(std::move(data));
- UpdateInputChannelStats(channel);
break;
}
case NDqProto::TCommandHeader::PUSH_SOURCE: {
@@ -368,7 +373,6 @@ public:
dataSerializer.Deserialize(std::move(batch), source->GetInputType(), buffer);
source->Push(std::move(buffer), request.GetSpace());
- UpdateSourceStats(source);
break;
}
case NDqProto::TCommandHeader::FINISH: {
@@ -376,7 +380,6 @@ public:
Y_ENSURE(taskId == Runner->GetTaskId());
auto channel = Runner->GetInputChannel(channelId);
channel->Finish();
- UpdateInputChannelStats(channel);
break;
}
case NDqProto::TCommandHeader::FINISH_OUTPUT: {
@@ -384,7 +387,6 @@ public:
Y_ENSURE(taskId == Runner->GetTaskId());
auto channel = Runner->GetOutputChannel(channelId);
channel->Finish();
- UpdateOutputChannelStats(channel);
break;
}
case NDqProto::TCommandHeader::FINISH_SOURCE: {
@@ -392,7 +394,6 @@ public:
Y_ENSURE(taskId == Runner->GetTaskId());
auto source = Runner->GetSource(channelId);
source->Finish();
- UpdateSourceStats(source);
break;
}
case NDqProto::TCommandHeader::DROP_OUTPUT: {
@@ -410,7 +411,6 @@ public:
Y_ENSURE(taskId == Runner->GetTaskId());
auto channel = Runner->GetOutputChannel(channelId);
channel->Terminate();
- UpdateOutputChannelStats(channel);
break;
}
case NDqProto::TCommandHeader::GET_STORED_BYTES: {
@@ -447,7 +447,6 @@ public:
response.SetResult(channel->Pop(batch));
bool isOOB = batch.IsOOB();
*response.MutableData() = std::move(batch.Proto);
- UpdateOutputChannelStats(channel);
TTaskCounters deltaStat;
deltaStat.TakeDeltaCounters(QueryStat, PrevStat);
@@ -480,7 +479,9 @@ public:
NDqProto::TRunResponse response;
auto status = Runner->Run();
response.SetResult(static_cast<ui32>(status));
- UpdateStats(response);
+ if (status == NDq::ERunStatus::Finished) {
+ UpdateStats(response);
+ }
response.Save(&output);
} catch (const NKikimr::TMemoryLimitExceededException& ex) {
throw yexception() << "DQ computation exceeds the memory limit " << DqConfiguration->MemoryLimit.Get().GetOrElse(0) << ". Try to increase the limit using PRAGMA dq.MemoryLimit";
@@ -694,7 +695,20 @@ public:
QueryStat.Measure<void>("MakeDqTaskRunner", [&]() {
NDq::TDqTaskRunnerSettings settings;
- settings.StatsMode = NYql::NDqProto::DQ_STATS_MODE_PROFILE;
+ auto statsMode = DqConfiguration->TaskRunnerStats.Get().GetOrElse(TDqSettings::TDefault::TaskRunnerStats);
+ settings.StatsMode = NYql::NDqProto::DQ_STATS_MODE_NONE;
+ switch (statsMode) {
+ case TDqSettings::ETaskRunnerStats::Basic:
+ settings.StatsMode = NYql::NDqProto::DQ_STATS_MODE_BASIC;
+ break;
+ case TDqSettings::ETaskRunnerStats::Full:
+ settings.StatsMode = NYql::NDqProto::DQ_STATS_MODE_FULL;
+ break;
+ case TDqSettings::ETaskRunnerStats::Profile:
+ settings.StatsMode = NYql::NDqProto::DQ_STATS_MODE_PROFILE;
+ break;
+ default: break;
+ }
settings.TerminateOnError = TerminateOnError;
for (const auto& x: taskMeta.GetSecureParams()) {
settings.SecureParams[x.first] = x.second;