diff options
author | whcrc <whcrc@yandex-team.ru> | 2022-03-14 19:14:46 +0300 |
---|---|---|
committer | whcrc <whcrc@yandex-team.ru> | 2022-03-14 19:14:46 +0300 |
commit | ca1ef1496bb884724f4b349b7d6a6a825755c405 (patch) | |
tree | 92568b4328d46fd308916f0c24d0c7b39fb1a00d | |
parent | 64b019f62d1da1b2772cca9b9d068a4fbd634378 (diff) | |
download | ydb-ca1ef1496bb884724f4b349b7d6a6a825755c405.tar.gz |
YQL-14358: add statistics to dq_async_compute_actor
ref:fafcbbd6fa1bc80e8022c0b529ea99dd682dbbba
15 files changed, 314 insertions, 178 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 72346aaff11..e200fef0b00 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -289,6 +289,10 @@ private: } void OnPopFinished(NTaskRunnerActor::TEvChannelPopFinished::TPtr& ev, const NActors::TActorContext&) { + if (ev->Get()->Stats) { + TaskRunnerStats = std::move(ev->Get()->Stats); + } + CA_LOG_D("OnPopFinished, stats: " << *TaskRunnerStats.Get()); auto channelId = ev->Get()->ChannelId; auto finished = ev->Get()->Finished; auto dataWasSent = ev->Get()->Changed; @@ -385,6 +389,10 @@ private: CheckRunStatus(); } + const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() override { + return TaskRunnerStats.Get(); + } + NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv; NTaskRunnerActor::ITaskRunnerActor* TaskRunnerActor; NActors::TActorId TaskRunnerActorId; @@ -403,6 +411,7 @@ private: }; THashMap<ui64, TTakeInputChannelData> TakeInputChannelDataRequests; ui64 Cookie = 0; + NDq::TDqTaskRunnerStatsView TaskRunnerStats; }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index a3afe7a55b1..d03d1e68f52 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -253,7 +253,7 @@ using TTaskRunnerFactory = std::function< TIntrusivePtr<IDqTaskRunner>(const NDqProto::TDqTask& task, const TLogFunc& logFunc) >; -void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TDqTaskRunnerStats& taskStats, +void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& taskStats, NDqProto::TDqTaskStats* protoTask, bool withProfileStats); NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index c90b8175346..c9f58160f6d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1347,6 +1347,14 @@ private: } } + virtual const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() { + if (!TaskRunner) { + return nullptr; + } + TaskRunner->UpdateStats(); + return TaskRunner->GetStats(); + } + void FillStats(NDqProto::TDqComputeActorStats* dst, bool last) { if (!BasicStats) { return; @@ -1364,58 +1372,54 @@ private: dst->SetMkqlExtraMemoryRequests(ProfileStats->MkqlExtraMemoryRequests); } - if (TaskRunner) { - TaskRunner->UpdateStats(); + if (auto* taskStats = GetTaskRunnerStats()) { + auto* protoTask = dst->AddTasks(); + FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, (bool) ProfileStats); - if (auto* taskStats = TaskRunner->GetStats()) { - auto* protoTask = dst->AddTasks(); - FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, (bool) ProfileStats); + for (auto& [outputIndex, sinkInfo] : SinksMap) { + if (auto* sinkStats = sinkInfo.Sink ? sinkInfo.Sink->GetStats() : nullptr) { + protoTask->SetOutputRows(protoTask->GetOutputRows() + sinkStats->RowsIn); + protoTask->SetOutputBytes(protoTask->GetOutputBytes() + sinkStats->Bytes); - for (auto& [outputIndex, sinkInfo] : SinksMap) { - if (auto* sinkStats = sinkInfo.Sink ? sinkInfo.Sink->GetStats() : nullptr) { - protoTask->SetOutputRows(protoTask->GetOutputRows() + sinkStats->RowsIn); - protoTask->SetOutputBytes(protoTask->GetOutputBytes() + sinkStats->Bytes); + if (ProfileStats) { + auto* protoSink = protoTask->AddSinks(); + protoSink->SetOutputIndex(outputIndex); - if (ProfileStats) { - auto* protoSink = protoTask->AddSinks(); - protoSink->SetOutputIndex(outputIndex); + protoSink->SetChunks(sinkStats->Chunks); + protoSink->SetBytes(sinkStats->Bytes); + protoSink->SetRowsIn(sinkStats->RowsIn); + protoSink->SetRowsOut(sinkStats->RowsOut); - protoSink->SetChunks(sinkStats->Chunks); - protoSink->SetBytes(sinkStats->Bytes); - protoSink->SetRowsIn(sinkStats->RowsIn); - protoSink->SetRowsOut(sinkStats->RowsOut); - - protoSink->SetMaxMemoryUsage(sinkStats->MaxMemoryUsage); - protoSink->SetErrorsCount(sinkInfo.IssuesBuffer.GetAllAddedIssuesCount()); - } + protoSink->SetMaxMemoryUsage(sinkStats->MaxMemoryUsage); + protoSink->SetErrorsCount(sinkInfo.IssuesBuffer.GetAllAddedIssuesCount()); } } + } - if (ProfileStats) { - for (auto& protoSource : *protoTask->MutableSources()) { - if (auto* sourceInfo = SourcesMap.FindPtr(protoSource.GetInputIndex())) { - protoSource.SetErrorsCount(sourceInfo->IssuesBuffer.GetAllAddedIssuesCount()); - } + if (ProfileStats) { + for (auto& protoSource : *protoTask->MutableSources()) { + if (auto* sourceInfo = SourcesMap.FindPtr(protoSource.GetInputIndex())) { + protoSource.SetErrorsCount(sourceInfo->IssuesBuffer.GetAllAddedIssuesCount()); } + } - for (auto& protoInputChannelStats : *protoTask->MutableInputChannels()) { - if (auto* caChannelStats = Channels->GetInputChannelStats(protoInputChannelStats.GetChannelId())) { - protoInputChannelStats.SetPollRequests(caChannelStats->PollRequests); - protoInputChannelStats.SetWaitTimeUs(caChannelStats->WaitTime.MicroSeconds()); - protoInputChannelStats.SetResentMessages(caChannelStats->ResentMessages); - } + for (auto& protoInputChannelStats : *protoTask->MutableInputChannels()) { + if (auto* caChannelStats = Channels->GetInputChannelStats(protoInputChannelStats.GetChannelId())) { + protoInputChannelStats.SetPollRequests(caChannelStats->PollRequests); + protoInputChannelStats.SetWaitTimeUs(caChannelStats->WaitTime.MicroSeconds()); + protoInputChannelStats.SetResentMessages(caChannelStats->ResentMessages); } + } - for (auto& protoOutputChannelStats : *protoTask->MutableOutputChannels()) { - if (auto* x = Channels->GetOutputChannelStats(protoOutputChannelStats.GetChannelId())) { - protoOutputChannelStats.SetResentMessages(x->ResentMessages); - } + for (auto& protoOutputChannelStats : *protoTask->MutableOutputChannels()) { + if (auto* x = Channels->GetOutputChannelStats(protoOutputChannelStats.GetChannelId())) { + protoOutputChannelStats.SetResentMessages(x->ResentMessages); + } - if (auto* outputInfo = OutputChannelsMap.FindPtr(protoOutputChannelStats.GetChannelId())) { - if (auto *x = outputInfo->Stats.Get()) { - protoOutputChannelStats.SetBlockedByCapacity(x->BlockedByCapacity); - protoOutputChannelStats.SetNoDstActorId(x->NoDstActorId); - } + if (auto* outputInfo = OutputChannelsMap.FindPtr(protoOutputChannelStats.GetChannelId())) { + if (auto *x = outputInfo->Stats.Get()) { + protoOutputChannelStats.SetBlockedByCapacity(x->BlockedByCapacity); + protoOutputChannelStats.SetNoDstActorId(x->NoDstActorId); } } } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp index cc13ce4d272..648d92ab4aa 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp @@ -5,7 +5,7 @@ namespace NYql { namespace NDq { -void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TDqTaskRunnerStats& taskStats, +void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& taskStats, NDqProto::TDqTaskStats* protoTask, bool withProfileStats) { protoTask->SetTaskId(taskId); diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h index 2acd46de5d2..6d64526cf4d 100644 --- a/ydb/library/yql/dq/actors/task_runner/events.h +++ b/ydb/library/yql/dq/actors/task_runner/events.h @@ -209,13 +209,15 @@ struct TEvChannelPopFinished : NActors::TEventLocal<TEvChannelPopFinished, TTaskRunnerEvents::ES_POP_FINISHED> { TEvChannelPopFinished() = default; TEvChannelPopFinished(ui32 channelId) - : ChannelId(channelId) + : Stats() + , ChannelId(channelId) , Data() , Finished(false) , Changed(false) { } - TEvChannelPopFinished(ui32 channelId, TVector<NDqProto::TData>&& data, bool finished, bool changed, const TTaskRunnerActorSensors& sensors = {}) + TEvChannelPopFinished(ui32 channelId, TVector<NDqProto::TData>&& data, bool finished, bool changed, const TTaskRunnerActorSensors& sensors = {}, TDqTaskRunnerStatsView&& stats = {}) : Sensors(sensors) + , Stats(std::move(stats)) , ChannelId(channelId) , Data(std::move(data)) , Finished(finished) @@ -223,6 +225,7 @@ struct TEvChannelPopFinished { } TTaskRunnerActorSensors Sensors; + NDq::TDqTaskRunnerStatsView Stats; const ui32 ChannelId; TVector<NDqProto::TData> Data; diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index c5dfa28b380..a3bfa2680a5 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -189,7 +189,9 @@ private: channelId, std::move(chunks), isFinished, - changed), + changed, + {}, + TDqTaskRunnerStatsView(TaskRunner->GetStats())), /*flags=*/0, ev->Cookie)); } diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.h b/ydb/library/yql/dq/runtime/dq_input_channel.h index 27020103ccd..500b95a801e 100644 --- a/ydb/library/yql/dq/runtime/dq_input_channel.h +++ b/ydb/library/yql/dq/runtime/dq_input_channel.h @@ -16,6 +16,20 @@ struct TDqInputChannelStats : TDqInputStats { explicit TDqInputChannelStats(ui64 channelId) : ChannelId(channelId) {} + + template<typename T> + void FromProto(const T& f) + { + this->ChannelId = f.GetChannelId(); + this->Chunks = f.GetChunks(); + this->Bytes = f.GetBytes(); + this->RowsIn = f.GetRowsIn(); + this->RowsOut = f.GetRowsOut(); + this->MaxMemoryUsage = f.GetMaxMemoryUsage(); + //s->StartTs = TInstant::MilliSeconds(f.GetStartTs()); + //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs()); + this->DeserializationTime = TDuration::MicroSeconds(f.GetDeserializationTimeUs()); + } }; class IDqInputChannel : public IDqInput { diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.h b/ydb/library/yql/dq/runtime/dq_output_channel.h index f7a4887d2d6..d87422fef53 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.h +++ b/ydb/library/yql/dq/runtime/dq_output_channel.h @@ -26,6 +26,19 @@ struct TDqOutputChannelStats : TDqOutputStats { explicit TDqOutputChannelStats(ui64 channelId) : ChannelId(channelId) {} + + template<typename T> + void FromProto(const T& f) + { + this->ChannelId = f.GetChannelId(); + this->Chunks = f.GetChunks(); + this->Bytes = f.GetBytes(); + this->RowsIn = f.GetRowsIn(); + this->RowsOut = f.GetRowsOut(); + this->MaxMemoryUsage = f.GetMaxMemoryUsage(); + //s->StartTs = TInstant::MilliSeconds(f.GetStartTs()); + //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs()); + } }; class IDqOutputChannel : public IDqOutput { diff --git a/ydb/library/yql/dq/runtime/dq_sink.h b/ydb/library/yql/dq/runtime/dq_sink.h index 63c91c96602..447647ae3ac 100644 --- a/ydb/library/yql/dq/runtime/dq_sink.h +++ b/ydb/library/yql/dq/runtime/dq_sink.h @@ -12,6 +12,16 @@ struct TDqSinkStats : TDqOutputStats { explicit TDqSinkStats(ui64 outputIndex) : OutputIndex(outputIndex) {} + + template<typename T> + void FromProto(const T& f) + { + this->Chunks = f.GetChunks(); + this->Bytes = f.GetBytes(); + this->RowsIn = f.GetRowsIn(); + this->RowsOut = f.GetRowsOut(); + this->MaxMemoryUsage = f.GetMaxMemoryUsage(); + } }; class IDqSink : public IDqOutput { diff --git a/ydb/library/yql/dq/runtime/dq_source.h b/ydb/library/yql/dq/runtime/dq_source.h index 01509873f53..3355a4966f9 100644 --- a/ydb/library/yql/dq/runtime/dq_source.h +++ b/ydb/library/yql/dq/runtime/dq_source.h @@ -8,6 +8,19 @@ struct TDqSourceStats : TDqInputStats { explicit TDqSourceStats(ui64 inputIndex) : InputIndex(inputIndex) {} + + template<typename T> + void FromProto(const T& f) + { + this->InputIndex = f.GetInputIndex(); + this->Chunks = f.GetChunks(); + this->Bytes = f.GetBytes(); + this->RowsIn = f.GetRowsIn(); + this->RowsOut = f.GetRowsOut(); + this->MaxMemoryUsage = f.GetMaxMemoryUsage(); + //s->StartTs = TInstant::MilliSeconds(f.GetStartTs()); + //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs()); + } }; class IDqSource : public IDqInput { diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index a0cc816e91e..e8142ad9475 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -66,7 +66,7 @@ struct TMkqlStat { i64 Value = 0; }; -struct TDqTaskRunnerStats { +struct TTaskRunnerStatsBase { // basic stats TDuration BuildCpuTime; TInstant FinishTs; @@ -85,6 +85,130 @@ struct TDqTaskRunnerStats { THashMap<ui64, const TDqOutputChannelStats*> OutputChannels; // Channel id -> Channel stats TVector<TMkqlStat> MkqlStats; + + TTaskRunnerStatsBase() = default; + TTaskRunnerStatsBase(TTaskRunnerStatsBase&&) = default; + TTaskRunnerStatsBase& operator=(TTaskRunnerStatsBase&&) = default; + + virtual ~TTaskRunnerStatsBase() = default; + + template<typename T> + void FromProto(const T& f) + { + //s->StartTs = TInstant::MilliSeconds(f.GetStartTs()); + //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs()); + this->BuildCpuTime = TDuration::MicroSeconds(f.GetBuildCpuTimeUs()); + this->ComputeCpuTime = TDuration::MicroSeconds(f.GetComputeCpuTimeUs()); + this->RunStatusTimeMetrics.Load(ERunStatus::PendingInput, TDuration::MicroSeconds(f.GetPendingInputTimeUs())); + this->RunStatusTimeMetrics.Load(ERunStatus::PendingOutput, TDuration::MicroSeconds(f.GetPendingOutputTimeUs())); + this->RunStatusTimeMetrics.Load(ERunStatus::Finished, TDuration::MicroSeconds(f.GetFinishTimeUs())); + //s->TotalTime = TDuration::MilliSeconds(f.GetTotalTime()); + this->WaitTime = TDuration::MicroSeconds(f.GetWaitTimeUs()); + this->WaitOutputTime = TDuration::MicroSeconds(f.GetWaitOutputTimeUs()); + + //s->MkqlTotalNodes = f.GetMkqlTotalNodes(); + //s->MkqlCodegenFunctions = f.GetMkqlCodegenFunctions(); + //s->CodeGenTotalInstructions = f.GetCodeGenTotalInstructions(); + //s->CodeGenTotalFunctions = f.GetCodeGenTotalFunctions(); + //s->CodeGenFullTime = f.GetCodeGenFullTime(); + //s->CodeGenFinalizeTime = f.GetCodeGenFinalizeTime(); + //s->CodeGenModulePassTime = f.GetCodeGenModulePassTime(); + + for (const auto& input : f.GetInputChannels()) { + this->MutableInputChannel(input.GetChannelId())->FromProto(input); + } + + for (const auto& output : f.GetOutputChannels()) { + this->MutableOutputChannel(output.GetChannelId())->FromProto(output); + } + + // todo: (whcrc) fill sources and ComputeCpuTimeByRun? + } + +private: + virtual TDqInputChannelStats* MutableInputChannel(ui64 channelId) = 0; + virtual TDqSourceStats* MutableSource(ui64 sourceId) = 0; // todo: (whcrc) unused, not modified by these pointers + virtual TDqOutputChannelStats* MutableOutputChannel(ui64 channelId) = 0; +}; + +struct TDqTaskRunnerStats : public TTaskRunnerStatsBase { + // these stats are owned by TDqTaskRunner + TDqInputChannelStats* MutableInputChannel(ui64 channelId) override { + return const_cast<TDqInputChannelStats*>(InputChannels[channelId]); + } + + TDqSourceStats* MutableSource(ui64 sourceId) override { + return const_cast<TDqSourceStats*>(Sources[sourceId]); + } + + TDqOutputChannelStats* MutableOutputChannel(ui64 channelId) override { + return const_cast<TDqOutputChannelStats*>(OutputChannels[channelId]); + } +}; + +struct TDqTaskRunnerStatsInplace : public TTaskRunnerStatsBase { + // all stats are owned by this object + TVector<THolder<TDqInputChannelStats>> InputChannelHolder; + TVector<THolder<TDqSourceStats>> SourceHolder; + TVector<THolder<TDqOutputChannelStats>> OutputChannelHolder; + + template<typename TStat> + static TStat* GetOrCreate(THashMap<ui64, const TStat*>& mapper, TVector<THolder<TStat>>& holder, ui64 statIdx) { + if (auto it = mapper.find(statIdx); it != mapper.end()) { + return const_cast<TStat*>(it->second); + } + holder.push_back(MakeHolder<TStat>(statIdx)); + mapper[statIdx] = holder.back().Get(); + return holder.back().Get(); + } + + TDqInputChannelStats* MutableInputChannel(ui64 channelId) override { + return GetOrCreate(InputChannels, InputChannelHolder, channelId); + } + + TDqSourceStats* MutableSource(ui64 sourceId) override { + return GetOrCreate(Sources, SourceHolder, sourceId); + } + + TDqOutputChannelStats* MutableOutputChannel(ui64 channelId) override { + return GetOrCreate(OutputChannels, OutputChannelHolder, channelId); + } +}; + +// Provides read access to TTaskRunnerStatsBase +// May or may not own the underlying object +class TDqTaskRunnerStatsView { +public: + TDqTaskRunnerStatsView() : IsDefined(false) {} + + TDqTaskRunnerStatsView(TDqTaskRunnerStatsInplace&& stats) // used in TTaskRunnerActor, cause it constructs stats on-the-fly, and cannot own it due to threaded implementation + : StatsInplace(std::move(stats)) + , StatsPtr(nullptr) + , IsInplace(true) + , IsDefined(true) { + } + + TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats) // used in TLocalTaskRunnerActor, cause it holds this stats, and does not modify it asyncronously from TDqAsyncComputeActor + : StatsInplace() + , StatsPtr(stats) + , IsInplace(false) + , IsDefined(true) { + } + + const TTaskRunnerStatsBase* Get() { + Y_VERIFY(IsDefined); + return IsInplace ? static_cast<const TTaskRunnerStatsBase*>(&StatsInplace) : StatsPtr; + } + + operator bool() const { + return IsDefined; + } + +private: + TDqTaskRunnerStatsInplace StatsInplace; + const TDqTaskRunnerStats* StatsPtr; + bool IsInplace; + bool IsDefined; }; struct TDqTaskRunnerContext { @@ -186,3 +310,16 @@ TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(const TDqTaskRunnerContext& ctx, c const TLogFunc& logFunc); } // namespace NYql::NDq + +template <> +inline void Out<NYql::NDq::TTaskRunnerStatsBase>(IOutputStream& os, TTypeTraits<NYql::NDq::TTaskRunnerStatsBase>::TFuncParam stats) { + os << "TTaskRunnerStatsBase:" << Endl + << "\tBuildCpuTime: " << stats.BuildCpuTime << Endl + << "\tFinishTs: " << stats.FinishTs << Endl + << "\tComputeCpuTime: " << stats.ComputeCpuTime << Endl + << "\tWaitTime: " << stats.WaitTime << Endl + << "\tWaitOutputTime: " << stats.WaitOutputTime << Endl + << "\tsize of InputChannels: " << stats.InputChannels.size() << Endl + << "\tsize of Sources: " << stats.Sources.size() << Endl + << "\tsize of OutputChannels: " << stats.OutputChannels.size(); +} diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto index 6e3d08e5954..3f570f1aa41 100644 --- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto +++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto @@ -3,6 +3,7 @@ option cc_enable_arenas = true; package NYql.NDqProto; +import "google/protobuf/any.proto"; import "ydb/library/yql/dq/actors/protos/dq_stats.proto"; import "ydb/library/yql/dq/proto/dq_transport.proto"; import "ydb/library/yql/dq/proto/dq_tasks.proto"; @@ -62,6 +63,7 @@ message TPopResponse { bool Result = 1; TData Data = 2; repeated TMetric Metric = 3; + google.protobuf.Any Stats = 4; } message TSinkPopRequest { diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index 9b41920b177..a2e7eda819b 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -240,6 +240,49 @@ public: _exit(127); } + NDqProto::TGetStatsResponse GetStats(ui64 taskId) { + const auto stats = Runner->GetStats(); + + NDqProto::TGetStatsResponse response; + auto* s = response.MutableStats(); + s->SetTaskId(taskId); + //s->SetStartTs(stats->StartTs.MilliSeconds()); + //s->SetFinishTs(stats->FinishTs.MilliSeconds()); + s->SetBuildCpuTimeUs(stats->BuildCpuTime.MicroSeconds()); + s->SetComputeCpuTimeUs(stats->ComputeCpuTime.MicroSeconds()); + + // All run statuses metrics + s->SetPendingInputTimeUs(stats->RunStatusTimeMetrics[NDq::ERunStatus::PendingInput].MicroSeconds()); + s->SetPendingOutputTimeUs(stats->RunStatusTimeMetrics[NDq::ERunStatus::PendingOutput].MicroSeconds()); + s->SetFinishTimeUs(stats->RunStatusTimeMetrics[NDq::ERunStatus::Finished].MicroSeconds()); + static_assert(NDq::TRunStatusTimeMetrics::StatusesCount == 3); // Add all statuses here + + //s->SetTotalTime(stats->TotalTime.MilliSeconds()); + s->SetWaitTimeUs(stats->WaitTime.MicroSeconds()); + s->SetWaitOutputTimeUs(stats->WaitOutputTime.MicroSeconds()); + + //s->SetMkqlTotalNodes(stats->MkqlTotalNodes); + //s->SetMkqlCodegenFunctions(stats->MkqlCodegenFunctions); + //s->SetCodeGenTotalInstructions(stats->CodeGenTotalInstructions); + //s->SetCodeGenTotalFunctions(stats->CodeGenTotalFunctions); + //s->SetCodeGenFullTime(stats->CodeGenFullTime); + //s->SetCodeGenFinalizeTime(stats->CodeGenFinalizeTime); + //s->SetCodeGenModulePassTime(stats->CodeGenModulePassTime); + + for (const auto& [id, ss] : stats->OutputChannels) { + ToProto(s->AddOutputChannels(), ss); + } + + for (const auto& [id, ss] : stats->InputChannels) { + ToProto(s->AddInputChannels(), ss); + } + + for (const auto& [id, ss] : stats->Sources) { + ToProto(s->AddSources(), ss); + } + return response; + } + void RunUnsafe() { FunctionRegistry = QueryStat.Measure<TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry>>("CreateFunctionRegistry", [&]() { @@ -432,6 +475,7 @@ public: response.SetResult(channel->Pop(*response.MutableData(), 5 << 20)); UpdateOutputChannelStats(channelId); QueryStat.FlushCounters(response); + response.MutableStats()->PackFrom(GetStats(taskId)); response.Save(&output); break; @@ -533,48 +577,7 @@ public: case NDqProto::TCommandHeader::GET_STATS: { Y_ENSURE(header.GetVersion() >= 3); Y_ENSURE(taskId == Runner->GetTaskId()); - auto stats = Runner->GetStats(); - - NDqProto::TGetStatsResponse response; - auto* s = response.MutableStats(); - s->SetTaskId(taskId); - //s->SetStartTs(stats->StartTs.MilliSeconds()); - //s->SetFinishTs(stats->FinishTs.MilliSeconds()); - s->SetBuildCpuTimeUs(stats->BuildCpuTime.MicroSeconds()); - s->SetComputeCpuTimeUs(stats->ComputeCpuTime.MicroSeconds()); - - // All run statuses metrics - s->SetPendingInputTimeUs(stats->RunStatusTimeMetrics[NDq::ERunStatus::PendingInput].MicroSeconds()); - s->SetPendingOutputTimeUs(stats->RunStatusTimeMetrics[NDq::ERunStatus::PendingOutput].MicroSeconds()); - s->SetFinishTimeUs(stats->RunStatusTimeMetrics[NDq::ERunStatus::Finished].MicroSeconds()); - static_assert(NDq::TRunStatusTimeMetrics::StatusesCount == 3); // Add all statuses here - - //s->SetTotalTime(stats->TotalTime.MilliSeconds()); - s->SetWaitTimeUs(stats->WaitTime.MicroSeconds()); - s->SetWaitOutputTimeUs(stats->WaitOutputTime.MicroSeconds()); - - //s->SetMkqlTotalNodes(stats->MkqlTotalNodes); - //s->SetMkqlCodegenFunctions(stats->MkqlCodegenFunctions); - //s->SetCodeGenTotalInstructions(stats->CodeGenTotalInstructions); - //s->SetCodeGenTotalFunctions(stats->CodeGenTotalFunctions); - //s->SetCodeGenFullTime(stats->CodeGenFullTime); - //s->SetCodeGenFinalizeTime(stats->CodeGenFinalizeTime); - //s->SetCodeGenModulePassTime(stats->CodeGenModulePassTime); - - for (const auto& [id, ss] : stats->OutputChannels) { - ToProto(s->AddOutputChannels(), ss); - } - - for (const auto& [id, ss] : stats->InputChannels) { - ToProto(s->AddInputChannels(), ss); - } - - for (const auto& [id, ss] : stats->Sources) { - ToProto(s->AddSources(), ss); - } - - response.Save(&output); - + GetStats(taskId).Save(&output); break; } case NDqProto::TCommandHeader::GET_STATS_INPUT: { diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index cdcbd32ef5d..dfc063e903d 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -45,87 +45,6 @@ extern "C" int kill(int pid, int sig); extern "C" int waitpid(int pid, int* status, int options); #endif -template<typename T> -void FromProto(TDqOutputChannelStats* s, const T& f) -{ - s->ChannelId = f.GetChannelId(); - s->Chunks = f.GetChunks(); - s->Bytes = f.GetBytes(); - s->RowsIn = f.GetRowsIn(); - s->RowsOut = f.GetRowsOut(); - s->MaxMemoryUsage = f.GetMaxMemoryUsage(); - //s->StartTs = TInstant::MilliSeconds(f.GetStartTs()); - //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs()); -} - -template<typename T> -void FromProto(TDqInputChannelStats* s, const T& f) -{ - s->ChannelId = f.GetChannelId(); - s->Chunks = f.GetChunks(); - s->Bytes = f.GetBytes(); - s->RowsIn = f.GetRowsIn(); - s->RowsOut = f.GetRowsOut(); - s->MaxMemoryUsage = f.GetMaxMemoryUsage(); - //s->StartTs = TInstant::MilliSeconds(f.GetStartTs()); - //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs()); - s->DeserializationTime = TDuration::MicroSeconds(f.GetDeserializationTimeUs()); -} - -template<typename T> -void FromProto(TDqSourceStats* s, const T& f) -{ - s->InputIndex = f.GetInputIndex(); - s->Chunks = f.GetChunks(); - s->Bytes = f.GetBytes(); - s->RowsIn = f.GetRowsIn(); - s->RowsOut = f.GetRowsOut(); - s->MaxMemoryUsage = f.GetMaxMemoryUsage(); - //s->StartTs = TInstant::MilliSeconds(f.GetStartTs()); - //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs()); -} - -template<typename T> -void FromProto(TDqSinkStats* s, const T& f) -{ - s->Chunks = f.GetChunks(); - s->Bytes = f.GetBytes(); - s->RowsIn = f.GetRowsIn(); - s->RowsOut = f.GetRowsOut(); - s->MaxMemoryUsage = f.GetMaxMemoryUsage(); -} - -template<typename T> -void FromProto(TDqTaskRunnerStats* s, const T& f) -{ - //s->StartTs = TInstant::MilliSeconds(f.GetStartTs()); - //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs()); - s->BuildCpuTime = TDuration::MicroSeconds(f.GetBuildCpuTimeUs()); - s->ComputeCpuTime = TDuration::MicroSeconds(f.GetComputeCpuTimeUs()); - s->RunStatusTimeMetrics.Load(ERunStatus::PendingInput, TDuration::MicroSeconds(f.GetPendingInputTimeUs())); - s->RunStatusTimeMetrics.Load(ERunStatus::PendingOutput, TDuration::MicroSeconds(f.GetPendingOutputTimeUs())); - s->RunStatusTimeMetrics.Load(ERunStatus::Finished, TDuration::MicroSeconds(f.GetFinishTimeUs())); - //s->TotalTime = TDuration::MilliSeconds(f.GetTotalTime()); - s->WaitTime = TDuration::MicroSeconds(f.GetWaitTimeUs()); - s->WaitOutputTime = TDuration::MicroSeconds(f.GetWaitOutputTimeUs()); - - //s->MkqlTotalNodes = f.GetMkqlTotalNodes(); - //s->MkqlCodegenFunctions = f.GetMkqlCodegenFunctions(); - //s->CodeGenTotalInstructions = f.GetCodeGenTotalInstructions(); - //s->CodeGenTotalFunctions = f.GetCodeGenTotalFunctions(); - //s->CodeGenFullTime = f.GetCodeGenFullTime(); - //s->CodeGenFinalizeTime = f.GetCodeGenFinalizeTime(); - //s->CodeGenModulePassTime = f.GetCodeGenModulePassTime(); - - for (const auto& input : f.GetInputChannels()) { - FromProto(const_cast<TDqInputChannelStats*>(s->InputChannels[input.GetChannelId()]), input); - } - - for (const auto& output : f.GetOutputChannels()) { - FromProto(const_cast<TDqOutputChannelStats*>(s->OutputChannels[output.GetChannelId()]), output); - } -} - class TChildProcess: private TNonCopyable { public: TChildProcess(const TString& exeName, const TVector<TString>& args, const THashMap<TString, TString>& env, const TString& workDir) @@ -658,7 +577,7 @@ public: NDqProto::TGetStatsInputResponse response; response.Load(&Input); - FromProto(&Stats, response.GetStats()); + Stats.FromProto(response.GetStats()); return &Stats; } catch (...) { TaskRunner->RaiseException(); @@ -820,7 +739,7 @@ public: NDqProto::TGetStatsSourceResponse response; response.Load(&Input); - FromProto(&Stats, response.GetStats()); + Stats.FromProto(response.GetStats()); return &Stats; } catch (...) { TaskRunner->RaiseException(); @@ -1055,7 +974,7 @@ public: NDqProto::TGetStatsOutputResponse response; response.Load(&Input); - FromProto(&Stats, response.GetStats()); + Stats.FromProto(response.GetStats()); return &Stats; } catch (...) { TaskRunner->RaiseException(); @@ -1211,7 +1130,7 @@ public: NDqProto::TSinkStatsResponse response; response.Load(&Input); - FromProto(&Stats, response.GetStats()); + Stats.FromProto(response.GetStats()); return &Stats; } catch (...) { TaskRunner->RaiseException(); @@ -1612,7 +1531,7 @@ public: NDqProto::TGetStatsResponse response; response.Load(&Delegate->GetInput()); - FromProto(&Stats, response.GetStats()); + Stats.FromProto(response.GetStats()); return &Stats; } catch (...) { diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 00d60ca63c3..af7151f48b5 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -345,16 +345,17 @@ private: } TVector<NDqProto::TData> chunks; + NDqProto::TPopResponse lastPop; NDqProto::TPopResponse response; for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) { NDqProto::TData data; - NDqProto::TPopResponse pop = channel->Pop(data, remain); + lastPop = std::move(channel->Pop(data, remain)); - for (auto& metric : pop.GetMetric()) { + for (auto& metric : lastPop.GetMetric()) { *response.AddMetric() = metric; } - hasData = pop.GetResult(); + hasData = lastPop.GetResult(); dataSize = data.GetRaw().size(); isFinished = !hasData && channel->IsFinished(); response.SetResult(response.GetResult() || hasData); @@ -365,6 +366,11 @@ private: } } + TDqTaskRunnerStatsInplace stats; + NDqProto::TGetStatsResponse pbStats; + lastPop.GetStats().UnpackTo(&pbStats); + stats.FromProto(pbStats.GetStats()); + actorSystem->Send( new IEventHandle( replyTo, @@ -374,7 +380,8 @@ private: std::move(chunks), isFinished, changed, - GetSensors(response)), + GetSensors(response), + TDqTaskRunnerStatsView(std::move(stats))), /*flags=*/0, cookie)); } catch (...) { |