aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@yandex-team.ru>2022-03-14 19:14:46 +0300
committerwhcrc <whcrc@yandex-team.ru>2022-03-14 19:14:46 +0300
commitca1ef1496bb884724f4b349b7d6a6a825755c405 (patch)
tree92568b4328d46fd308916f0c24d0c7b39fb1a00d
parent64b019f62d1da1b2772cca9b9d068a4fbd634378 (diff)
downloadydb-ca1ef1496bb884724f4b349b7d6a6a825755c405.tar.gz
YQL-14358: add statistics to dq_async_compute_actor
ref:fafcbbd6fa1bc80e8022c0b529ea99dd682dbbba
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp9
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h84
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp2
-rw-r--r--ydb/library/yql/dq/actors/task_runner/events.h7
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp4
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_channel.h14
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.h13
-rw-r--r--ydb/library/yql/dq/runtime/dq_sink.h10
-rw-r--r--ydb/library/yql/dq/runtime/dq_source.h13
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h139
-rw-r--r--ydb/library/yql/providers/dq/api/protos/task_command_executor.proto2
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp87
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp91
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp15
15 files changed, 314 insertions, 178 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 72346aaff11..e200fef0b00 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -289,6 +289,10 @@ private:
}
void OnPopFinished(NTaskRunnerActor::TEvChannelPopFinished::TPtr& ev, const NActors::TActorContext&) {
+ if (ev->Get()->Stats) {
+ TaskRunnerStats = std::move(ev->Get()->Stats);
+ }
+ CA_LOG_D("OnPopFinished, stats: " << *TaskRunnerStats.Get());
auto channelId = ev->Get()->ChannelId;
auto finished = ev->Get()->Finished;
auto dataWasSent = ev->Get()->Changed;
@@ -385,6 +389,10 @@ private:
CheckRunStatus();
}
+ const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() override {
+ return TaskRunnerStats.Get();
+ }
+
NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv;
NTaskRunnerActor::ITaskRunnerActor* TaskRunnerActor;
NActors::TActorId TaskRunnerActorId;
@@ -403,6 +411,7 @@ private:
};
THashMap<ui64, TTakeInputChannelData> TakeInputChannelDataRequests;
ui64 Cookie = 0;
+ NDq::TDqTaskRunnerStatsView TaskRunnerStats;
};
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index a3afe7a55b1..d03d1e68f52 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -253,7 +253,7 @@ using TTaskRunnerFactory = std::function<
TIntrusivePtr<IDqTaskRunner>(const NDqProto::TDqTask& task, const TLogFunc& logFunc)
>;
-void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TDqTaskRunnerStats& taskStats,
+void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& taskStats,
NDqProto::TDqTaskStats* protoTask, bool withProfileStats);
NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index c90b8175346..c9f58160f6d 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1347,6 +1347,14 @@ private:
}
}
+ virtual const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() {
+ if (!TaskRunner) {
+ return nullptr;
+ }
+ TaskRunner->UpdateStats();
+ return TaskRunner->GetStats();
+ }
+
void FillStats(NDqProto::TDqComputeActorStats* dst, bool last) {
if (!BasicStats) {
return;
@@ -1364,58 +1372,54 @@ private:
dst->SetMkqlExtraMemoryRequests(ProfileStats->MkqlExtraMemoryRequests);
}
- if (TaskRunner) {
- TaskRunner->UpdateStats();
+ if (auto* taskStats = GetTaskRunnerStats()) {
+ auto* protoTask = dst->AddTasks();
+ FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, (bool) ProfileStats);
- if (auto* taskStats = TaskRunner->GetStats()) {
- auto* protoTask = dst->AddTasks();
- FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, (bool) ProfileStats);
+ for (auto& [outputIndex, sinkInfo] : SinksMap) {
+ if (auto* sinkStats = sinkInfo.Sink ? sinkInfo.Sink->GetStats() : nullptr) {
+ protoTask->SetOutputRows(protoTask->GetOutputRows() + sinkStats->RowsIn);
+ protoTask->SetOutputBytes(protoTask->GetOutputBytes() + sinkStats->Bytes);
- for (auto& [outputIndex, sinkInfo] : SinksMap) {
- if (auto* sinkStats = sinkInfo.Sink ? sinkInfo.Sink->GetStats() : nullptr) {
- protoTask->SetOutputRows(protoTask->GetOutputRows() + sinkStats->RowsIn);
- protoTask->SetOutputBytes(protoTask->GetOutputBytes() + sinkStats->Bytes);
+ if (ProfileStats) {
+ auto* protoSink = protoTask->AddSinks();
+ protoSink->SetOutputIndex(outputIndex);
- if (ProfileStats) {
- auto* protoSink = protoTask->AddSinks();
- protoSink->SetOutputIndex(outputIndex);
+ protoSink->SetChunks(sinkStats->Chunks);
+ protoSink->SetBytes(sinkStats->Bytes);
+ protoSink->SetRowsIn(sinkStats->RowsIn);
+ protoSink->SetRowsOut(sinkStats->RowsOut);
- protoSink->SetChunks(sinkStats->Chunks);
- protoSink->SetBytes(sinkStats->Bytes);
- protoSink->SetRowsIn(sinkStats->RowsIn);
- protoSink->SetRowsOut(sinkStats->RowsOut);
-
- protoSink->SetMaxMemoryUsage(sinkStats->MaxMemoryUsage);
- protoSink->SetErrorsCount(sinkInfo.IssuesBuffer.GetAllAddedIssuesCount());
- }
+ protoSink->SetMaxMemoryUsage(sinkStats->MaxMemoryUsage);
+ protoSink->SetErrorsCount(sinkInfo.IssuesBuffer.GetAllAddedIssuesCount());
}
}
+ }
- if (ProfileStats) {
- for (auto& protoSource : *protoTask->MutableSources()) {
- if (auto* sourceInfo = SourcesMap.FindPtr(protoSource.GetInputIndex())) {
- protoSource.SetErrorsCount(sourceInfo->IssuesBuffer.GetAllAddedIssuesCount());
- }
+ if (ProfileStats) {
+ for (auto& protoSource : *protoTask->MutableSources()) {
+ if (auto* sourceInfo = SourcesMap.FindPtr(protoSource.GetInputIndex())) {
+ protoSource.SetErrorsCount(sourceInfo->IssuesBuffer.GetAllAddedIssuesCount());
}
+ }
- for (auto& protoInputChannelStats : *protoTask->MutableInputChannels()) {
- if (auto* caChannelStats = Channels->GetInputChannelStats(protoInputChannelStats.GetChannelId())) {
- protoInputChannelStats.SetPollRequests(caChannelStats->PollRequests);
- protoInputChannelStats.SetWaitTimeUs(caChannelStats->WaitTime.MicroSeconds());
- protoInputChannelStats.SetResentMessages(caChannelStats->ResentMessages);
- }
+ for (auto& protoInputChannelStats : *protoTask->MutableInputChannels()) {
+ if (auto* caChannelStats = Channels->GetInputChannelStats(protoInputChannelStats.GetChannelId())) {
+ protoInputChannelStats.SetPollRequests(caChannelStats->PollRequests);
+ protoInputChannelStats.SetWaitTimeUs(caChannelStats->WaitTime.MicroSeconds());
+ protoInputChannelStats.SetResentMessages(caChannelStats->ResentMessages);
}
+ }
- for (auto& protoOutputChannelStats : *protoTask->MutableOutputChannels()) {
- if (auto* x = Channels->GetOutputChannelStats(protoOutputChannelStats.GetChannelId())) {
- protoOutputChannelStats.SetResentMessages(x->ResentMessages);
- }
+ for (auto& protoOutputChannelStats : *protoTask->MutableOutputChannels()) {
+ if (auto* x = Channels->GetOutputChannelStats(protoOutputChannelStats.GetChannelId())) {
+ protoOutputChannelStats.SetResentMessages(x->ResentMessages);
+ }
- if (auto* outputInfo = OutputChannelsMap.FindPtr(protoOutputChannelStats.GetChannelId())) {
- if (auto *x = outputInfo->Stats.Get()) {
- protoOutputChannelStats.SetBlockedByCapacity(x->BlockedByCapacity);
- protoOutputChannelStats.SetNoDstActorId(x->NoDstActorId);
- }
+ if (auto* outputInfo = OutputChannelsMap.FindPtr(protoOutputChannelStats.GetChannelId())) {
+ if (auto *x = outputInfo->Stats.Get()) {
+ protoOutputChannelStats.SetBlockedByCapacity(x->BlockedByCapacity);
+ protoOutputChannelStats.SetNoDstActorId(x->NoDstActorId);
}
}
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp
index cc13ce4d272..648d92ab4aa 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp
@@ -5,7 +5,7 @@
namespace NYql {
namespace NDq {
-void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TDqTaskRunnerStats& taskStats,
+void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& taskStats,
NDqProto::TDqTaskStats* protoTask, bool withProfileStats)
{
protoTask->SetTaskId(taskId);
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h
index 2acd46de5d2..6d64526cf4d 100644
--- a/ydb/library/yql/dq/actors/task_runner/events.h
+++ b/ydb/library/yql/dq/actors/task_runner/events.h
@@ -209,13 +209,15 @@ struct TEvChannelPopFinished
: NActors::TEventLocal<TEvChannelPopFinished, TTaskRunnerEvents::ES_POP_FINISHED> {
TEvChannelPopFinished() = default;
TEvChannelPopFinished(ui32 channelId)
- : ChannelId(channelId)
+ : Stats()
+ , ChannelId(channelId)
, Data()
, Finished(false)
, Changed(false)
{ }
- TEvChannelPopFinished(ui32 channelId, TVector<NDqProto::TData>&& data, bool finished, bool changed, const TTaskRunnerActorSensors& sensors = {})
+ TEvChannelPopFinished(ui32 channelId, TVector<NDqProto::TData>&& data, bool finished, bool changed, const TTaskRunnerActorSensors& sensors = {}, TDqTaskRunnerStatsView&& stats = {})
: Sensors(sensors)
+ , Stats(std::move(stats))
, ChannelId(channelId)
, Data(std::move(data))
, Finished(finished)
@@ -223,6 +225,7 @@ struct TEvChannelPopFinished
{ }
TTaskRunnerActorSensors Sensors;
+ NDq::TDqTaskRunnerStatsView Stats;
const ui32 ChannelId;
TVector<NDqProto::TData> Data;
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index c5dfa28b380..a3bfa2680a5 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -189,7 +189,9 @@ private:
channelId,
std::move(chunks),
isFinished,
- changed),
+ changed,
+ {},
+ TDqTaskRunnerStatsView(TaskRunner->GetStats())),
/*flags=*/0,
ev->Cookie));
}
diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.h b/ydb/library/yql/dq/runtime/dq_input_channel.h
index 27020103ccd..500b95a801e 100644
--- a/ydb/library/yql/dq/runtime/dq_input_channel.h
+++ b/ydb/library/yql/dq/runtime/dq_input_channel.h
@@ -16,6 +16,20 @@ struct TDqInputChannelStats : TDqInputStats {
explicit TDqInputChannelStats(ui64 channelId)
: ChannelId(channelId) {}
+
+ template<typename T>
+ void FromProto(const T& f)
+ {
+ this->ChannelId = f.GetChannelId();
+ this->Chunks = f.GetChunks();
+ this->Bytes = f.GetBytes();
+ this->RowsIn = f.GetRowsIn();
+ this->RowsOut = f.GetRowsOut();
+ this->MaxMemoryUsage = f.GetMaxMemoryUsage();
+ //s->StartTs = TInstant::MilliSeconds(f.GetStartTs());
+ //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs());
+ this->DeserializationTime = TDuration::MicroSeconds(f.GetDeserializationTimeUs());
+ }
};
class IDqInputChannel : public IDqInput {
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.h b/ydb/library/yql/dq/runtime/dq_output_channel.h
index f7a4887d2d6..d87422fef53 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.h
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.h
@@ -26,6 +26,19 @@ struct TDqOutputChannelStats : TDqOutputStats {
explicit TDqOutputChannelStats(ui64 channelId)
: ChannelId(channelId) {}
+
+ template<typename T>
+ void FromProto(const T& f)
+ {
+ this->ChannelId = f.GetChannelId();
+ this->Chunks = f.GetChunks();
+ this->Bytes = f.GetBytes();
+ this->RowsIn = f.GetRowsIn();
+ this->RowsOut = f.GetRowsOut();
+ this->MaxMemoryUsage = f.GetMaxMemoryUsage();
+ //s->StartTs = TInstant::MilliSeconds(f.GetStartTs());
+ //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs());
+ }
};
class IDqOutputChannel : public IDqOutput {
diff --git a/ydb/library/yql/dq/runtime/dq_sink.h b/ydb/library/yql/dq/runtime/dq_sink.h
index 63c91c96602..447647ae3ac 100644
--- a/ydb/library/yql/dq/runtime/dq_sink.h
+++ b/ydb/library/yql/dq/runtime/dq_sink.h
@@ -12,6 +12,16 @@ struct TDqSinkStats : TDqOutputStats {
explicit TDqSinkStats(ui64 outputIndex)
: OutputIndex(outputIndex) {}
+
+ template<typename T>
+ void FromProto(const T& f)
+ {
+ this->Chunks = f.GetChunks();
+ this->Bytes = f.GetBytes();
+ this->RowsIn = f.GetRowsIn();
+ this->RowsOut = f.GetRowsOut();
+ this->MaxMemoryUsage = f.GetMaxMemoryUsage();
+ }
};
class IDqSink : public IDqOutput {
diff --git a/ydb/library/yql/dq/runtime/dq_source.h b/ydb/library/yql/dq/runtime/dq_source.h
index 01509873f53..3355a4966f9 100644
--- a/ydb/library/yql/dq/runtime/dq_source.h
+++ b/ydb/library/yql/dq/runtime/dq_source.h
@@ -8,6 +8,19 @@ struct TDqSourceStats : TDqInputStats {
explicit TDqSourceStats(ui64 inputIndex)
: InputIndex(inputIndex) {}
+
+ template<typename T>
+ void FromProto(const T& f)
+ {
+ this->InputIndex = f.GetInputIndex();
+ this->Chunks = f.GetChunks();
+ this->Bytes = f.GetBytes();
+ this->RowsIn = f.GetRowsIn();
+ this->RowsOut = f.GetRowsOut();
+ this->MaxMemoryUsage = f.GetMaxMemoryUsage();
+ //s->StartTs = TInstant::MilliSeconds(f.GetStartTs());
+ //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs());
+ }
};
class IDqSource : public IDqInput {
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index a0cc816e91e..e8142ad9475 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -66,7 +66,7 @@ struct TMkqlStat {
i64 Value = 0;
};
-struct TDqTaskRunnerStats {
+struct TTaskRunnerStatsBase {
// basic stats
TDuration BuildCpuTime;
TInstant FinishTs;
@@ -85,6 +85,130 @@ struct TDqTaskRunnerStats {
THashMap<ui64, const TDqOutputChannelStats*> OutputChannels; // Channel id -> Channel stats
TVector<TMkqlStat> MkqlStats;
+
+ TTaskRunnerStatsBase() = default;
+ TTaskRunnerStatsBase(TTaskRunnerStatsBase&&) = default;
+ TTaskRunnerStatsBase& operator=(TTaskRunnerStatsBase&&) = default;
+
+ virtual ~TTaskRunnerStatsBase() = default;
+
+ template<typename T>
+ void FromProto(const T& f)
+ {
+ //s->StartTs = TInstant::MilliSeconds(f.GetStartTs());
+ //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs());
+ this->BuildCpuTime = TDuration::MicroSeconds(f.GetBuildCpuTimeUs());
+ this->ComputeCpuTime = TDuration::MicroSeconds(f.GetComputeCpuTimeUs());
+ this->RunStatusTimeMetrics.Load(ERunStatus::PendingInput, TDuration::MicroSeconds(f.GetPendingInputTimeUs()));
+ this->RunStatusTimeMetrics.Load(ERunStatus::PendingOutput, TDuration::MicroSeconds(f.GetPendingOutputTimeUs()));
+ this->RunStatusTimeMetrics.Load(ERunStatus::Finished, TDuration::MicroSeconds(f.GetFinishTimeUs()));
+ //s->TotalTime = TDuration::MilliSeconds(f.GetTotalTime());
+ this->WaitTime = TDuration::MicroSeconds(f.GetWaitTimeUs());
+ this->WaitOutputTime = TDuration::MicroSeconds(f.GetWaitOutputTimeUs());
+
+ //s->MkqlTotalNodes = f.GetMkqlTotalNodes();
+ //s->MkqlCodegenFunctions = f.GetMkqlCodegenFunctions();
+ //s->CodeGenTotalInstructions = f.GetCodeGenTotalInstructions();
+ //s->CodeGenTotalFunctions = f.GetCodeGenTotalFunctions();
+ //s->CodeGenFullTime = f.GetCodeGenFullTime();
+ //s->CodeGenFinalizeTime = f.GetCodeGenFinalizeTime();
+ //s->CodeGenModulePassTime = f.GetCodeGenModulePassTime();
+
+ for (const auto& input : f.GetInputChannels()) {
+ this->MutableInputChannel(input.GetChannelId())->FromProto(input);
+ }
+
+ for (const auto& output : f.GetOutputChannels()) {
+ this->MutableOutputChannel(output.GetChannelId())->FromProto(output);
+ }
+
+ // todo: (whcrc) fill sources and ComputeCpuTimeByRun?
+ }
+
+private:
+ virtual TDqInputChannelStats* MutableInputChannel(ui64 channelId) = 0;
+ virtual TDqSourceStats* MutableSource(ui64 sourceId) = 0; // todo: (whcrc) unused, not modified by these pointers
+ virtual TDqOutputChannelStats* MutableOutputChannel(ui64 channelId) = 0;
+};
+
+struct TDqTaskRunnerStats : public TTaskRunnerStatsBase {
+ // these stats are owned by TDqTaskRunner
+ TDqInputChannelStats* MutableInputChannel(ui64 channelId) override {
+ return const_cast<TDqInputChannelStats*>(InputChannels[channelId]);
+ }
+
+ TDqSourceStats* MutableSource(ui64 sourceId) override {
+ return const_cast<TDqSourceStats*>(Sources[sourceId]);
+ }
+
+ TDqOutputChannelStats* MutableOutputChannel(ui64 channelId) override {
+ return const_cast<TDqOutputChannelStats*>(OutputChannels[channelId]);
+ }
+};
+
+struct TDqTaskRunnerStatsInplace : public TTaskRunnerStatsBase {
+ // all stats are owned by this object
+ TVector<THolder<TDqInputChannelStats>> InputChannelHolder;
+ TVector<THolder<TDqSourceStats>> SourceHolder;
+ TVector<THolder<TDqOutputChannelStats>> OutputChannelHolder;
+
+ template<typename TStat>
+ static TStat* GetOrCreate(THashMap<ui64, const TStat*>& mapper, TVector<THolder<TStat>>& holder, ui64 statIdx) {
+ if (auto it = mapper.find(statIdx); it != mapper.end()) {
+ return const_cast<TStat*>(it->second);
+ }
+ holder.push_back(MakeHolder<TStat>(statIdx));
+ mapper[statIdx] = holder.back().Get();
+ return holder.back().Get();
+ }
+
+ TDqInputChannelStats* MutableInputChannel(ui64 channelId) override {
+ return GetOrCreate(InputChannels, InputChannelHolder, channelId);
+ }
+
+ TDqSourceStats* MutableSource(ui64 sourceId) override {
+ return GetOrCreate(Sources, SourceHolder, sourceId);
+ }
+
+ TDqOutputChannelStats* MutableOutputChannel(ui64 channelId) override {
+ return GetOrCreate(OutputChannels, OutputChannelHolder, channelId);
+ }
+};
+
+// Provides read access to TTaskRunnerStatsBase
+// May or may not own the underlying object
+class TDqTaskRunnerStatsView {
+public:
+ TDqTaskRunnerStatsView() : IsDefined(false) {}
+
+ TDqTaskRunnerStatsView(TDqTaskRunnerStatsInplace&& stats) // used in TTaskRunnerActor, cause it constructs stats on-the-fly, and cannot own it due to threaded implementation
+ : StatsInplace(std::move(stats))
+ , StatsPtr(nullptr)
+ , IsInplace(true)
+ , IsDefined(true) {
+ }
+
+ TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats) // used in TLocalTaskRunnerActor, cause it holds this stats, and does not modify it asyncronously from TDqAsyncComputeActor
+ : StatsInplace()
+ , StatsPtr(stats)
+ , IsInplace(false)
+ , IsDefined(true) {
+ }
+
+ const TTaskRunnerStatsBase* Get() {
+ Y_VERIFY(IsDefined);
+ return IsInplace ? static_cast<const TTaskRunnerStatsBase*>(&StatsInplace) : StatsPtr;
+ }
+
+ operator bool() const {
+ return IsDefined;
+ }
+
+private:
+ TDqTaskRunnerStatsInplace StatsInplace;
+ const TDqTaskRunnerStats* StatsPtr;
+ bool IsInplace;
+ bool IsDefined;
};
struct TDqTaskRunnerContext {
@@ -186,3 +310,16 @@ TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(const TDqTaskRunnerContext& ctx, c
const TLogFunc& logFunc);
} // namespace NYql::NDq
+
+template <>
+inline void Out<NYql::NDq::TTaskRunnerStatsBase>(IOutputStream& os, TTypeTraits<NYql::NDq::TTaskRunnerStatsBase>::TFuncParam stats) {
+ os << "TTaskRunnerStatsBase:" << Endl
+ << "\tBuildCpuTime: " << stats.BuildCpuTime << Endl
+ << "\tFinishTs: " << stats.FinishTs << Endl
+ << "\tComputeCpuTime: " << stats.ComputeCpuTime << Endl
+ << "\tWaitTime: " << stats.WaitTime << Endl
+ << "\tWaitOutputTime: " << stats.WaitOutputTime << Endl
+ << "\tsize of InputChannels: " << stats.InputChannels.size() << Endl
+ << "\tsize of Sources: " << stats.Sources.size() << Endl
+ << "\tsize of OutputChannels: " << stats.OutputChannels.size();
+}
diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
index 6e3d08e5954..3f570f1aa41 100644
--- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
+++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
@@ -3,6 +3,7 @@ option cc_enable_arenas = true;
package NYql.NDqProto;
+import "google/protobuf/any.proto";
import "ydb/library/yql/dq/actors/protos/dq_stats.proto";
import "ydb/library/yql/dq/proto/dq_transport.proto";
import "ydb/library/yql/dq/proto/dq_tasks.proto";
@@ -62,6 +63,7 @@ message TPopResponse {
bool Result = 1;
TData Data = 2;
repeated TMetric Metric = 3;
+ google.protobuf.Any Stats = 4;
}
message TSinkPopRequest {
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index 9b41920b177..a2e7eda819b 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -240,6 +240,49 @@ public:
_exit(127);
}
+ NDqProto::TGetStatsResponse GetStats(ui64 taskId) {
+ const auto stats = Runner->GetStats();
+
+ NDqProto::TGetStatsResponse response;
+ auto* s = response.MutableStats();
+ s->SetTaskId(taskId);
+ //s->SetStartTs(stats->StartTs.MilliSeconds());
+ //s->SetFinishTs(stats->FinishTs.MilliSeconds());
+ s->SetBuildCpuTimeUs(stats->BuildCpuTime.MicroSeconds());
+ s->SetComputeCpuTimeUs(stats->ComputeCpuTime.MicroSeconds());
+
+ // All run statuses metrics
+ s->SetPendingInputTimeUs(stats->RunStatusTimeMetrics[NDq::ERunStatus::PendingInput].MicroSeconds());
+ s->SetPendingOutputTimeUs(stats->RunStatusTimeMetrics[NDq::ERunStatus::PendingOutput].MicroSeconds());
+ s->SetFinishTimeUs(stats->RunStatusTimeMetrics[NDq::ERunStatus::Finished].MicroSeconds());
+ static_assert(NDq::TRunStatusTimeMetrics::StatusesCount == 3); // Add all statuses here
+
+ //s->SetTotalTime(stats->TotalTime.MilliSeconds());
+ s->SetWaitTimeUs(stats->WaitTime.MicroSeconds());
+ s->SetWaitOutputTimeUs(stats->WaitOutputTime.MicroSeconds());
+
+ //s->SetMkqlTotalNodes(stats->MkqlTotalNodes);
+ //s->SetMkqlCodegenFunctions(stats->MkqlCodegenFunctions);
+ //s->SetCodeGenTotalInstructions(stats->CodeGenTotalInstructions);
+ //s->SetCodeGenTotalFunctions(stats->CodeGenTotalFunctions);
+ //s->SetCodeGenFullTime(stats->CodeGenFullTime);
+ //s->SetCodeGenFinalizeTime(stats->CodeGenFinalizeTime);
+ //s->SetCodeGenModulePassTime(stats->CodeGenModulePassTime);
+
+ for (const auto& [id, ss] : stats->OutputChannels) {
+ ToProto(s->AddOutputChannels(), ss);
+ }
+
+ for (const auto& [id, ss] : stats->InputChannels) {
+ ToProto(s->AddInputChannels(), ss);
+ }
+
+ for (const auto& [id, ss] : stats->Sources) {
+ ToProto(s->AddSources(), ss);
+ }
+ return response;
+ }
+
void RunUnsafe()
{
FunctionRegistry = QueryStat.Measure<TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry>>("CreateFunctionRegistry", [&]() {
@@ -432,6 +475,7 @@ public:
response.SetResult(channel->Pop(*response.MutableData(), 5 << 20));
UpdateOutputChannelStats(channelId);
QueryStat.FlushCounters(response);
+ response.MutableStats()->PackFrom(GetStats(taskId));
response.Save(&output);
break;
@@ -533,48 +577,7 @@ public:
case NDqProto::TCommandHeader::GET_STATS: {
Y_ENSURE(header.GetVersion() >= 3);
Y_ENSURE(taskId == Runner->GetTaskId());
- auto stats = Runner->GetStats();
-
- NDqProto::TGetStatsResponse response;
- auto* s = response.MutableStats();
- s->SetTaskId(taskId);
- //s->SetStartTs(stats->StartTs.MilliSeconds());
- //s->SetFinishTs(stats->FinishTs.MilliSeconds());
- s->SetBuildCpuTimeUs(stats->BuildCpuTime.MicroSeconds());
- s->SetComputeCpuTimeUs(stats->ComputeCpuTime.MicroSeconds());
-
- // All run statuses metrics
- s->SetPendingInputTimeUs(stats->RunStatusTimeMetrics[NDq::ERunStatus::PendingInput].MicroSeconds());
- s->SetPendingOutputTimeUs(stats->RunStatusTimeMetrics[NDq::ERunStatus::PendingOutput].MicroSeconds());
- s->SetFinishTimeUs(stats->RunStatusTimeMetrics[NDq::ERunStatus::Finished].MicroSeconds());
- static_assert(NDq::TRunStatusTimeMetrics::StatusesCount == 3); // Add all statuses here
-
- //s->SetTotalTime(stats->TotalTime.MilliSeconds());
- s->SetWaitTimeUs(stats->WaitTime.MicroSeconds());
- s->SetWaitOutputTimeUs(stats->WaitOutputTime.MicroSeconds());
-
- //s->SetMkqlTotalNodes(stats->MkqlTotalNodes);
- //s->SetMkqlCodegenFunctions(stats->MkqlCodegenFunctions);
- //s->SetCodeGenTotalInstructions(stats->CodeGenTotalInstructions);
- //s->SetCodeGenTotalFunctions(stats->CodeGenTotalFunctions);
- //s->SetCodeGenFullTime(stats->CodeGenFullTime);
- //s->SetCodeGenFinalizeTime(stats->CodeGenFinalizeTime);
- //s->SetCodeGenModulePassTime(stats->CodeGenModulePassTime);
-
- for (const auto& [id, ss] : stats->OutputChannels) {
- ToProto(s->AddOutputChannels(), ss);
- }
-
- for (const auto& [id, ss] : stats->InputChannels) {
- ToProto(s->AddInputChannels(), ss);
- }
-
- for (const auto& [id, ss] : stats->Sources) {
- ToProto(s->AddSources(), ss);
- }
-
- response.Save(&output);
-
+ GetStats(taskId).Save(&output);
break;
}
case NDqProto::TCommandHeader::GET_STATS_INPUT: {
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index cdcbd32ef5d..dfc063e903d 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -45,87 +45,6 @@ extern "C" int kill(int pid, int sig);
extern "C" int waitpid(int pid, int* status, int options);
#endif
-template<typename T>
-void FromProto(TDqOutputChannelStats* s, const T& f)
-{
- s->ChannelId = f.GetChannelId();
- s->Chunks = f.GetChunks();
- s->Bytes = f.GetBytes();
- s->RowsIn = f.GetRowsIn();
- s->RowsOut = f.GetRowsOut();
- s->MaxMemoryUsage = f.GetMaxMemoryUsage();
- //s->StartTs = TInstant::MilliSeconds(f.GetStartTs());
- //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs());
-}
-
-template<typename T>
-void FromProto(TDqInputChannelStats* s, const T& f)
-{
- s->ChannelId = f.GetChannelId();
- s->Chunks = f.GetChunks();
- s->Bytes = f.GetBytes();
- s->RowsIn = f.GetRowsIn();
- s->RowsOut = f.GetRowsOut();
- s->MaxMemoryUsage = f.GetMaxMemoryUsage();
- //s->StartTs = TInstant::MilliSeconds(f.GetStartTs());
- //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs());
- s->DeserializationTime = TDuration::MicroSeconds(f.GetDeserializationTimeUs());
-}
-
-template<typename T>
-void FromProto(TDqSourceStats* s, const T& f)
-{
- s->InputIndex = f.GetInputIndex();
- s->Chunks = f.GetChunks();
- s->Bytes = f.GetBytes();
- s->RowsIn = f.GetRowsIn();
- s->RowsOut = f.GetRowsOut();
- s->MaxMemoryUsage = f.GetMaxMemoryUsage();
- //s->StartTs = TInstant::MilliSeconds(f.GetStartTs());
- //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs());
-}
-
-template<typename T>
-void FromProto(TDqSinkStats* s, const T& f)
-{
- s->Chunks = f.GetChunks();
- s->Bytes = f.GetBytes();
- s->RowsIn = f.GetRowsIn();
- s->RowsOut = f.GetRowsOut();
- s->MaxMemoryUsage = f.GetMaxMemoryUsage();
-}
-
-template<typename T>
-void FromProto(TDqTaskRunnerStats* s, const T& f)
-{
- //s->StartTs = TInstant::MilliSeconds(f.GetStartTs());
- //s->FinishTs = TInstant::MilliSeconds(f.GetFinishTs());
- s->BuildCpuTime = TDuration::MicroSeconds(f.GetBuildCpuTimeUs());
- s->ComputeCpuTime = TDuration::MicroSeconds(f.GetComputeCpuTimeUs());
- s->RunStatusTimeMetrics.Load(ERunStatus::PendingInput, TDuration::MicroSeconds(f.GetPendingInputTimeUs()));
- s->RunStatusTimeMetrics.Load(ERunStatus::PendingOutput, TDuration::MicroSeconds(f.GetPendingOutputTimeUs()));
- s->RunStatusTimeMetrics.Load(ERunStatus::Finished, TDuration::MicroSeconds(f.GetFinishTimeUs()));
- //s->TotalTime = TDuration::MilliSeconds(f.GetTotalTime());
- s->WaitTime = TDuration::MicroSeconds(f.GetWaitTimeUs());
- s->WaitOutputTime = TDuration::MicroSeconds(f.GetWaitOutputTimeUs());
-
- //s->MkqlTotalNodes = f.GetMkqlTotalNodes();
- //s->MkqlCodegenFunctions = f.GetMkqlCodegenFunctions();
- //s->CodeGenTotalInstructions = f.GetCodeGenTotalInstructions();
- //s->CodeGenTotalFunctions = f.GetCodeGenTotalFunctions();
- //s->CodeGenFullTime = f.GetCodeGenFullTime();
- //s->CodeGenFinalizeTime = f.GetCodeGenFinalizeTime();
- //s->CodeGenModulePassTime = f.GetCodeGenModulePassTime();
-
- for (const auto& input : f.GetInputChannels()) {
- FromProto(const_cast<TDqInputChannelStats*>(s->InputChannels[input.GetChannelId()]), input);
- }
-
- for (const auto& output : f.GetOutputChannels()) {
- FromProto(const_cast<TDqOutputChannelStats*>(s->OutputChannels[output.GetChannelId()]), output);
- }
-}
-
class TChildProcess: private TNonCopyable {
public:
TChildProcess(const TString& exeName, const TVector<TString>& args, const THashMap<TString, TString>& env, const TString& workDir)
@@ -658,7 +577,7 @@ public:
NDqProto::TGetStatsInputResponse response;
response.Load(&Input);
- FromProto(&Stats, response.GetStats());
+ Stats.FromProto(response.GetStats());
return &Stats;
} catch (...) {
TaskRunner->RaiseException();
@@ -820,7 +739,7 @@ public:
NDqProto::TGetStatsSourceResponse response;
response.Load(&Input);
- FromProto(&Stats, response.GetStats());
+ Stats.FromProto(response.GetStats());
return &Stats;
} catch (...) {
TaskRunner->RaiseException();
@@ -1055,7 +974,7 @@ public:
NDqProto::TGetStatsOutputResponse response;
response.Load(&Input);
- FromProto(&Stats, response.GetStats());
+ Stats.FromProto(response.GetStats());
return &Stats;
} catch (...) {
TaskRunner->RaiseException();
@@ -1211,7 +1130,7 @@ public:
NDqProto::TSinkStatsResponse response;
response.Load(&Input);
- FromProto(&Stats, response.GetStats());
+ Stats.FromProto(response.GetStats());
return &Stats;
} catch (...) {
TaskRunner->RaiseException();
@@ -1612,7 +1531,7 @@ public:
NDqProto::TGetStatsResponse response;
response.Load(&Delegate->GetInput());
- FromProto(&Stats, response.GetStats());
+ Stats.FromProto(response.GetStats());
return &Stats;
} catch (...) {
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index 00d60ca63c3..af7151f48b5 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -345,16 +345,17 @@ private:
}
TVector<NDqProto::TData> chunks;
+ NDqProto::TPopResponse lastPop;
NDqProto::TPopResponse response;
for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) {
NDqProto::TData data;
- NDqProto::TPopResponse pop = channel->Pop(data, remain);
+ lastPop = std::move(channel->Pop(data, remain));
- for (auto& metric : pop.GetMetric()) {
+ for (auto& metric : lastPop.GetMetric()) {
*response.AddMetric() = metric;
}
- hasData = pop.GetResult();
+ hasData = lastPop.GetResult();
dataSize = data.GetRaw().size();
isFinished = !hasData && channel->IsFinished();
response.SetResult(response.GetResult() || hasData);
@@ -365,6 +366,11 @@ private:
}
}
+ TDqTaskRunnerStatsInplace stats;
+ NDqProto::TGetStatsResponse pbStats;
+ lastPop.GetStats().UnpackTo(&pbStats);
+ stats.FromProto(pbStats.GetStats());
+
actorSystem->Send(
new IEventHandle(
replyTo,
@@ -374,7 +380,8 @@ private:
std::move(chunks),
isFinished,
changed,
- GetSensors(response)),
+ GetSensors(response),
+ TDqTaskRunnerStatsView(std::move(stats))),
/*flags=*/0,
cookie));
} catch (...) {