diff options
author | whcrc <whcrc@ydb.tech> | 2022-11-03 17:57:35 +0300 |
---|---|---|
committer | whcrc <whcrc@ydb.tech> | 2022-11-03 17:57:35 +0300 |
commit | 9af9d54be754d0ad4e4de11f5c5aeccd3274c92b (patch) | |
tree | 94bef3b683c0d7232b6ffc172115fac6d6734976 | |
parent | c30a953bfdd09f1ef4ef16997dab04c664a182b5 (diff) | |
download | ydb-9af9d54be754d0ad4e4de11f5c5aeccd3274c92b.tar.gz |
, stats aggregation in async CA, fixes [ProcessInit, MakeDqTaskRunner] counters
11 files changed, 82 insertions, 48 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 4ae2f85225e..2c2dda745ed 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -39,7 +39,8 @@ public: const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, const ::NMonitoring::TDynamicCounterPtr& taskCounters, - const TActorId& quoterServiceActorId) + const TActorId& quoterServiceActorId, + bool ownCounters) : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false, false, taskCounters) , TaskRunnerActorFactory(taskRunnerActorFactory) , ReadyToCheckpointFlag(false) @@ -47,6 +48,10 @@ public: , QuoterServiceActorId(quoterServiceActorId) { InitExtraMonCounters(taskCounters); + if (ownCounters) { + CA_LOG_D("TDqAsyncComputeActor, make stat"); + Stat = MakeHolder<NYql::TCounters>(); + } } void DoBootstrap() { @@ -429,7 +434,9 @@ private: const auto& taskParams = ev->Get()->TaskParams; const auto& typeEnv = ev->Get()->TypeEnv; const auto& holderFactory = ev->Get()->HolderFactory; - + if (Stat) { + Stat->AddCounters2(ev->Get()->Sensors); + } TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv); FillIoMaps(holderFactory, typeEnv, secureParams, taskParams); @@ -457,6 +464,9 @@ private: } void OnRunFinished(NTaskRunnerActor::TEvTaskRunFinished::TPtr& ev, const NActors::TActorContext& ) { + if (Stat) { + Stat->AddCounters2(ev->Get()->Sensors); + } ContinueRunInflight = false; TrySendAsyncChannelsData(); // send from previous cycle @@ -533,10 +543,9 @@ private: } void OnPopFinished(NTaskRunnerActor::TEvChannelPopFinished::TPtr& ev, const NActors::TActorContext&) { - if (ev->Get()->Stats) { - TaskRunnerStats = std::move(ev->Get()->Stats); + if (Stat) { + Stat->AddCounters2(ev->Get()->Sensors); } - CA_LOG_T("OnPopFinished, stats: " << *TaskRunnerStats.Get()); auto it = OutputChannelsMap.find(ev->Get()->ChannelId); Y_VERIFY(it != OutputChannelsMap.end()); TOutputChannelInfo& outputChannel = it->second; @@ -886,10 +895,11 @@ IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, ::NMonitoring::TDynamicCounterPtr taskCounters, - const TActorId& quoterServiceActorId) + const TActorId& quoterServiceActorId, + bool ownCounters) { return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory), - functionRegistry, settings, memoryLimits, taskRunnerActorFactory, taskCounters, quoterServiceActorId); + functionRegistry, settings, memoryLimits, taskRunnerActorFactory, taskCounters, quoterServiceActorId, ownCounters); } } // namespace NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h index 16de5b51b13..2cc261b8657 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h @@ -23,7 +23,8 @@ NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr, - const NActors::TActorId& quoterServiceActorId = {}); + const NActors::TActorId& quoterServiceActorId = {}, + bool ownCounters = false); } // namespace NDq } // namespace NYql diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index eb2887d1604..7e583a8ed87 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -13,6 +13,7 @@ #include <ydb/core/base/wilson.h> #include <ydb/core/protos/services.pb.h> +#include <ydb/library/yql/providers/dq/counters/counters.h> #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/dq/common/dq_common.h> #include <ydb/library/yql/dq/proto/dq_tasks.pb.h> @@ -1853,7 +1854,21 @@ public: dst->SetMkqlExtraMemoryRequests(GetProfileStats()->MkqlExtraMemoryRequests); } - if (auto* taskStats = GetTaskRunnerStats()) { + if (Stat) { // for task_runner_actor + Y_VERIFY(!dst->HasExtra()); + NDqProto::TExtraStats extraStats; + for (const auto& [name, entry]: Stat->Get()) { + NDqProto::TDqStatsAggr metric; + metric.SetSum(entry.Sum); + metric.SetMax(entry.Max); + metric.SetMin(entry.Min); + //metric.SetAvg(entry.Avg); + metric.SetCnt(entry.Count); + (*extraStats.MutableStats())[name] = metric; + } + dst->MutableExtra()->PackFrom(extraStats); + Stat->Clear(); + } else if (auto* taskStats = GetTaskRunnerStats()) { // for task_runner_actor_local auto* protoTask = dst->AddTasks(); FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, (bool) GetProfileStats()); @@ -2011,6 +2026,7 @@ protected: bool MonCountersProvided = false; ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryUsage; ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryLimit; + THolder<NYql::TCounters> Stat; }; } // namespace NYql diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp index f6bcd7a92f8..4de25a1f9a0 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp @@ -12,7 +12,6 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& protoTask->SetStageId(stageId); protoTask->SetCpuTimeUs(taskStats.ComputeCpuTime.MicroSeconds() + taskStats.BuildCpuTime.MicroSeconds()); protoTask->SetFinishTimeMs(taskStats.FinishTs.MilliSeconds()); - protoTask->SetProcessInit(taskStats.ProcessInit.MilliSeconds()); // Cerr << (TStringBuilder() << "FillTaskRunnerStats: " << taskStats.ComputeCpuTime << ", " << taskStats.BuildCpuTime << Endl); if (Y_UNLIKELY(withProfileStats)) { diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index 280bc90a935..1ddaaabfbac 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -107,6 +107,8 @@ message TDqMkqlStat { } message TDqTaskStats { + reserved 154; + // basic stats uint64 TaskId = 1; uint32 StageId = 2; @@ -140,7 +142,6 @@ message TDqTaskStats { repeated TDqInputChannelStats InputChannels = 151; repeated TDqAsyncOutputBufferStats Sinks = 152; repeated TDqOutputChannelStats OutputChannels = 153; - uint64 ProcessInit = 154; google.protobuf.Any Extra = 200; } @@ -166,6 +167,10 @@ message TDqStatsAggr { uint64 Cnt = 4; } +message TExtraStats { + map<string, TDqStatsAggr> Stats = 1; +} + message TDqStatsMinMax { uint64 Min = 1; uint64 Max = 2; diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 24b9ad5c47c..433f46edb08 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -75,7 +75,6 @@ struct TTaskRunnerStatsBase { TDuration ComputeCpuTime; TRunStatusTimeMetrics RunStatusTimeMetrics; // ComputeCpuTime + RunStatusTimeMetrics == 100% time - TDuration ProcessInit; // profile stats TDuration WaitTime; // wall time of waiting for input, scans & output @@ -184,24 +183,13 @@ class TDqTaskRunnerStatsView { public: TDqTaskRunnerStatsView() : IsDefined(false) {} - TDqTaskRunnerStatsView(TDqTaskRunnerStatsInplace&& stats) // used in TTaskRunnerActor, cause it constructs stats on-the-fly, and cannot own it due to threaded implementation - : StatsInplace(std::move(stats)) - , StatsPtr(nullptr) - , IsInplace(true) - , IsDefined(true) { - } - TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats) // used in TLocalTaskRunnerActor, cause it holds this stats, and does not modify it asyncronously from TDqAsyncComputeActor - : StatsInplace() - , StatsPtr(stats) - , IsInplace(false) + : StatsPtr(stats) , IsDefined(true) { } TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats, THashMap<ui32, const TDqAsyncOutputBufferStats*>&& sinkStats) - : StatsInplace() - , StatsPtr(stats) - , IsInplace(false) + : StatsPtr(stats) , IsDefined(true) , SinkStats(std::move(sinkStats)) { } @@ -210,7 +198,7 @@ public: if (!IsDefined) { return nullptr; } - return IsInplace ? static_cast<const TTaskRunnerStatsBase*>(&StatsInplace) : StatsPtr; + return StatsPtr; } operator bool() const { @@ -222,9 +210,7 @@ public: } private: - TDqTaskRunnerStatsInplace StatsInplace; const TDqTaskRunnerStats* StatsPtr; - bool IsInplace; bool IsDefined; THashMap<ui32, const TDqAsyncOutputBufferStats*> SinkStats; }; diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index 64faded43b2..0508c9a964c 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -83,7 +83,8 @@ IActor* CreateComputeActor( memoryLimits, taskRunnerActorFactory, taskCounters, - options.QuoterServiceActorId); + options.QuoterServiceActorId, + options.ComputeActorOwnsCounters); } } diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp index 7d3d874e69c..fc431d99cf2 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller.cpp +++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp @@ -130,7 +130,11 @@ private: << " PingCookie: " << ev->Cookie << " StatusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(state.GetStatusCode()); - if (state.HasStats() && state.GetStats().GetTasks().size()) { + if (state.HasStats() && TryAddStatsFromExtra(state.GetStats())) { + if (ServiceCounters.Counters && !AggrPeriod) { + ExportStats(TaskStat, taskId); + } + } else if (state.HasStats() && state.GetStats().GetTasks().size()) { YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats " << taskId; AddStats(state.GetStats()); if (ServiceCounters.Counters && !AggrPeriod) { @@ -138,6 +142,7 @@ private: } } + TIssues localIssues; // TODO: don't convert issues to string NYql::IssuesFromMessage(state.GetIssues(), localIssues); @@ -278,6 +283,24 @@ private: } } + bool TryAddStatsFromExtra(const NDqProto::TDqComputeActorStats& x) { + NDqProto::TExtraStats extraStats; + if (x.HasExtra() && x.GetExtra().UnpackTo(&extraStats)) { + YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats from extra"; + for (const auto& [name, m] : extraStats.GetStats()) { + NYql::TCounters::TEntry value; + value.Sum = m.GetSum(); + value.Max = m.GetMax(); + value.Min = m.GetMin(); + //value.Avg = m.GetAvg(); + value.Count = m.GetCnt(); + TaskStat.AddCounter(name, value); + } + return true; + } + return false; + } + void AddStats(const NDqProto::TDqComputeActorStats& x) { YQL_ENSURE(x.GetTasks().size() == 1); auto& s = x.GetTasks(0); @@ -303,10 +326,6 @@ private: ADD_COUNTER(InputBytes) ADD_COUNTER(OutputRows) ADD_COUNTER(OutputBytes) - if (stats.GetProcessInit()) { - const auto val = static_cast<i64>(stats.GetProcessInit()); - TaskStat.AddCounter(TaskStat.GetCounterName("TaskRunner", labels, "ProcessInit"), NYql::TCounters::TEntry{val, val, val, val, val}); - } // profile stats ADD_COUNTER(BuildCpuTimeUs) diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index e7b9a2c6cb9..9d5c1bf3633 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -129,6 +129,12 @@ public: *Runner->GetStats(), CurrentStats, Runner->GetTaskId()); + for (const auto& [inputId, _]: CurrentInputChannelsStats) { + UpdateInputChannelStats(inputId); + } + for (const auto& [outputId, _]: CurrentOutputChannelsStats) { + UpdateOutputChannelStats(outputId); + } } void UpdateOutputChannelStats(ui64 channelId) diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index b8f8741e8eb..7317ee4abcb 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -293,7 +293,7 @@ private: auto cookie = ev->Cookie; auto wasFinished = ev->Get()->WasFinished; auto toPop = ev->Get()->Size; - Invoker->Invoke([cookie,selfId,channelId=ev->Get()->ChannelId, actorSystem, replyTo, wasFinished, toPop, taskRunner=TaskRunner, settings=Settings, stageId=StageId, processInit=this->ProcessInit]() { + Invoker->Invoke([cookie,selfId,channelId=ev->Get()->ChannelId, actorSystem, replyTo, wasFinished, toPop, taskRunner=TaskRunner, settings=Settings, stageId=StageId]() { try { // auto guard = taskRunner->BindAllocator(); // only for local mode auto channel = taskRunner->GetOutputChannel(channelId); @@ -311,11 +311,10 @@ private: } TVector<NDqProto::TData> chunks; - NDqProto::TPopResponse lastPop; NDqProto::TPopResponse response; for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) { NDqProto::TData data; - lastPop = std::move(channel->Pop(data, remain)); + const auto lastPop = std::move(channel->Pop(data, remain)); for (auto& metric : lastPop.GetMetric()) { *response.AddMetric() = metric; @@ -332,12 +331,6 @@ private: } } - TDqTaskRunnerStatsInplace stats; - NDqProto::TGetStatsResponse pbStats; - lastPop.GetStats().UnpackTo(&pbStats); - stats.FromProto(pbStats.GetStats()); - stats.ProcessInit = processInit; - actorSystem->Send( new IEventHandle( replyTo, @@ -349,8 +342,7 @@ private: Nothing(), isFinished, changed, - GetSensors(response), - TDqTaskRunnerStatsView(std::move(stats))), + GetSensors(response)), /*flags=*/0, cookie)); } catch (...) { @@ -470,7 +462,7 @@ private: Settings->FreezeDefaults(); StageId = taskMeta.GetStageId(); } - Invoker->Invoke([taskRunner=TaskRunner, replyTo, selfId, cookie, actorSystem, settings=Settings, stageId=StageId, startTime, clusterName = ClusterName,this_=this](){ + Invoker->Invoke([taskRunner=TaskRunner, replyTo, selfId, cookie, actorSystem, settings=Settings, stageId=StageId, startTime, clusterName = ClusterName](){ try { //auto guard = taskRunner->BindAllocator(); // only for local mode auto result = taskRunner->Prepare(); @@ -481,7 +473,6 @@ private: "ProcessInit"); i64 val = (TInstant::Now()-startTime).MilliSeconds(); sensors.push_back({sensorName, val, val, val, val, 1}); - this_->ProcessInit = TDuration::MilliSeconds(val); auto event = MakeHolder<TEvTaskRunnerCreateFinished>( taskRunner->GetSecureParams(), @@ -581,7 +572,6 @@ private: ui64 StageId; TWorkerRuntimeData* RuntimeData; TString ClusterName; - TDuration ProcessInit = TDuration::Zero(); }; class TTaskRunnerActorFactory: public ITaskRunnerActorFactory { diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h index f49e2c01ac5..7aacd11b261 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h @@ -38,6 +38,7 @@ namespace NYql::NDqs { bool CanUseComputeActor = true; NActors::TActorId QuoterServiceActorId; + bool ComputeActorOwnsCounters = false; }; NActors::IActor* CreateLocalWorkerManager(const TLocalWorkerManagerOptions& options); |