diff options
author | aozeritsky <aozeritsky@ydb.tech> | 2023-11-19 21:57:24 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@ydb.tech> | 2023-11-19 22:20:43 +0300 |
commit | c0e09264adc48044d0fd225374245e9b06a4d78c (patch) | |
tree | 1b88ba485c4db554c3ae48566f8c6b61994ab149 | |
parent | a342b47017d029930f618e485b621baf68eab447 (diff) | |
download | ydb-c0e09264adc48044d0fd225374245e9b06a4d78c.tar.gz |
Disable profile stats by default, disable incremental stats
-rw-r--r-- | ydb/library/yql/providers/dq/runtime/task_command_executor.cpp | 48 |
1 files changed, 31 insertions, 17 deletions
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index 7811c075f7..06ca141aa7 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -141,15 +141,21 @@ public: *Runner->GetStats(), Runner->GetTaskId(), StageId); - auto stats = Runner->GetStats(); - for (const auto& [stageId, stageChannels] : stats->OutputChannels) { - for (const auto& [id, channel] : stageChannels) { - UpdateOutputChannelStats(channel); + auto statsMode = DqConfiguration->TaskRunnerStats.Get().GetOrElse(TDqSettings::TDefault::TaskRunnerStats); + if (statsMode == TDqSettings::ETaskRunnerStats::Profile) { + auto stats = Runner->GetStats(); + for (const auto& [stageId, stageChannels] : stats->OutputChannels) { + for (const auto& [id, channel] : stageChannels) { + UpdateOutputChannelStats(channel); + } } - } - for (const auto& [stageId, stageChannels] : stats->InputChannels) { - for (const auto& [id, channel] : stageChannels) { - UpdateInputChannelStats(channel); + for (const auto& [stageId, stageChannels] : stats->InputChannels) { + for (const auto& [id, channel] : stageChannels) { + UpdateInputChannelStats(channel); + } + } + for (const auto& [sourceId, source] : stats->Sources) { + UpdateSourceStats(source); } } } @@ -345,7 +351,6 @@ public: auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit channel->Push(std::move(data)); - UpdateInputChannelStats(channel); break; } case NDqProto::TCommandHeader::PUSH_SOURCE: { @@ -368,7 +373,6 @@ public: dataSerializer.Deserialize(std::move(batch), source->GetInputType(), buffer); source->Push(std::move(buffer), request.GetSpace()); - UpdateSourceStats(source); break; } case NDqProto::TCommandHeader::FINISH: { @@ -376,7 +380,6 @@ public: Y_ENSURE(taskId == Runner->GetTaskId()); auto channel = Runner->GetInputChannel(channelId); channel->Finish(); - UpdateInputChannelStats(channel); break; } case NDqProto::TCommandHeader::FINISH_OUTPUT: { @@ -384,7 +387,6 @@ public: Y_ENSURE(taskId == Runner->GetTaskId()); auto channel = Runner->GetOutputChannel(channelId); channel->Finish(); - UpdateOutputChannelStats(channel); break; } case NDqProto::TCommandHeader::FINISH_SOURCE: { @@ -392,7 +394,6 @@ public: Y_ENSURE(taskId == Runner->GetTaskId()); auto source = Runner->GetSource(channelId); source->Finish(); - UpdateSourceStats(source); break; } case NDqProto::TCommandHeader::DROP_OUTPUT: { @@ -410,7 +411,6 @@ public: Y_ENSURE(taskId == Runner->GetTaskId()); auto channel = Runner->GetOutputChannel(channelId); channel->Terminate(); - UpdateOutputChannelStats(channel); break; } case NDqProto::TCommandHeader::GET_STORED_BYTES: { @@ -447,7 +447,6 @@ public: response.SetResult(channel->Pop(batch)); bool isOOB = batch.IsOOB(); *response.MutableData() = std::move(batch.Proto); - UpdateOutputChannelStats(channel); TTaskCounters deltaStat; deltaStat.TakeDeltaCounters(QueryStat, PrevStat); @@ -480,7 +479,9 @@ public: NDqProto::TRunResponse response; auto status = Runner->Run(); response.SetResult(static_cast<ui32>(status)); - UpdateStats(response); + if (status == NDq::ERunStatus::Finished) { + UpdateStats(response); + } response.Save(&output); } catch (const NKikimr::TMemoryLimitExceededException& ex) { throw yexception() << "DQ computation exceeds the memory limit " << DqConfiguration->MemoryLimit.Get().GetOrElse(0) << ". Try to increase the limit using PRAGMA dq.MemoryLimit"; @@ -694,7 +695,20 @@ public: QueryStat.Measure<void>("MakeDqTaskRunner", [&]() { NDq::TDqTaskRunnerSettings settings; - settings.StatsMode = NYql::NDqProto::DQ_STATS_MODE_PROFILE; + auto statsMode = DqConfiguration->TaskRunnerStats.Get().GetOrElse(TDqSettings::TDefault::TaskRunnerStats); + settings.StatsMode = NYql::NDqProto::DQ_STATS_MODE_NONE; + switch (statsMode) { + case TDqSettings::ETaskRunnerStats::Basic: + settings.StatsMode = NYql::NDqProto::DQ_STATS_MODE_BASIC; + break; + case TDqSettings::ETaskRunnerStats::Full: + settings.StatsMode = NYql::NDqProto::DQ_STATS_MODE_FULL; + break; + case TDqSettings::ETaskRunnerStats::Profile: + settings.StatsMode = NYql::NDqProto::DQ_STATS_MODE_PROFILE; + break; + default: break; + } settings.TerminateOnError = TerminateOnError; for (const auto& x: taskMeta.GetSecureParams()) { settings.SecureParams[x.first] = x.second; |