diff options
author | ulya-sidorina <[email protected]> | 2023-01-10 17:22:37 +0300 |
---|---|---|
committer | ulya-sidorina <[email protected]> | 2023-01-10 17:22:37 +0300 |
commit | 7e9cd0d757a65e066603131c6d54fb93e68ab5cb (patch) | |
tree | f92d8e330b773250b1587f57d8e2eb97733970c2 | |
parent | 9c2656be278f707dd2ed430b385352254ea2ec6a (diff) |
collect input transform stats
feature(kqp): collect input transform stats
11 files changed, 130 insertions, 8 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp index 51e854da17e..cb2c43db43b 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp @@ -25,4 +25,37 @@ bool FindSchemeErrorInIssues(const Ydb::StatusIds::StatusCode& status, const NYq return schemeError; } +void FillTaskInputStats(const NYql::NDqProto::TDqTask& task, NYql::NDqProto::TDqTaskStats& taskStats) { + THashMap<ui32, TString> inputTables; + + for (ui32 inputIndex = 0; inputIndex < task.InputsSize(); ++inputIndex) { + const auto& taskInput = task.GetInputs(inputIndex); + if (taskInput.HasTransform()) { + const auto& transform = taskInput.GetTransform(); + YQL_ENSURE(transform.GetType() == "StreamLookupInputTransformer", + "Unexpected input transform type: " << transform.GetType()); + + const google::protobuf::Any &settingsAny = transform.GetSettings(); + YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: " + << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name() + << " , but got: " << settingsAny.type_url()); + + NKikimrKqp::TKqpStreamLookupSettings settings; + YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings"); + + inputTables.insert({inputIndex, settings.GetTable().GetPath()}); + } + } + + for (const auto& transformerStats : taskStats.GetInputTransforms()) { + auto tableIt = inputTables.find(transformerStats.GetInputIndex()); + YQL_ENSURE(tableIt != inputTables.end()); + + auto* tableStats = taskStats.AddTables(); + tableStats->SetTablePath(tableIt->second); + tableStats->SetReadRows(transformerStats.GetRowsOut()); + tableStats->SetReadBytes(transformerStats.GetBytes()); + } +} + } // namespace NKikimr::NKqp::NComputeActor diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h index b32537d9514..cab30fb4a50 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h @@ -12,6 +12,11 @@ namespace NKqp { using namespace NYql; using namespace NYql::NDq; +namespace NComputeActor { +void FillTaskInputStats(const NDqProto::TDqTask& task, NDqProto::TDqTaskStats& taskStats); + +} // namespace NComputeActor + class TKqpTaskRunnerExecutionContext : public IDqTaskRunnerExecutionContext { public: TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index ed6caca25ad..c9131f3efdf 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -192,6 +192,11 @@ public: tableStats->MutableExtra()->PackFrom(tableExtraStats); } } + + if (last && dst->TasksSize() > 0) { + YQL_ENSURE(dst->TasksSize() == 1); + NComputeActor::FillTaskInputStats(GetTask(), *dst->MutableTasks(0)); + } } private: diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index dc5f72a4042..9beb26a3fa8 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -284,6 +284,11 @@ public: // dst->MutableExtra()->PackFrom(extraStats); } + + if (last && dst->TasksSize() > 0) { + YQL_ENSURE(dst->TasksSize() == 1); + NComputeActor::FillTaskInputStats(GetTask(), *dst->MutableTasks(0)); + } } protected: diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index f8fa2ef0179..c591b2d63f5 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -3297,7 +3297,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } TExecDataQuerySettings querySettings; - querySettings.CollectQueryStats(ECollectQueryStatsMode::Full); + querySettings.CollectQueryStats(ECollectQueryStatsMode::Profile); { auto result = session.ExecuteDataQuery(R"( @@ -3314,6 +3314,13 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true); auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); UNIT_ASSERT(streamLookup.IsDefined()); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/EightShard"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).name(), "/Root/KeyValue"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).reads().rows(), 2); } { @@ -3341,6 +3348,12 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true); auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); UNIT_ASSERT(streamLookup.IsDefined()); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/KeyValue"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 2); } { @@ -3357,6 +3370,12 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true); auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); UNIT_ASSERT(streamLookup.IsDefined()); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/KeyValue"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1); } } diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index d3d0cb9a703..b84e55a0e00 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -124,7 +124,7 @@ private: void OnStateRequest(TEvDqCompute::TEvStateRequest::TPtr& ev) { CA_LOG_T("Got TEvStateRequest from actor " << ev->Sender << " PingCookie: " << ev->Cookie); if (!SentStatsRequest) { - Send(TaskRunnerActorId, new NTaskRunnerActor::TEvStatistics(GetIds(SinksMap))); + Send(TaskRunnerActorId, new NTaskRunnerActor::TEvStatistics(GetIds(SinksMap), GetIds(InputTransformsMap))); SentStatsRequest = true; } WaitingForStateResponse.push_back({ev->Sender, ev->Cookie}); @@ -187,6 +187,11 @@ private: return TaskRunnerStats.GetSinkStats(outputIdx); } + const TDqAsyncInputBufferStats* GetInputTransformStats(ui64 inputIdx, const TAsyncInputTransformInfo& inputTransformInfo) const override { + Y_UNUSED(inputTransformInfo); + return TaskRunnerStats.GetInputTransformStats(inputIdx); + } + void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) override { YQL_ENSURE(!outputChannel.Finished || Checkpoints); @@ -809,6 +814,7 @@ private: void AskContinueRun(std::unique_ptr<NTaskRunnerActor::TEvContinueRun> continueRunEvent) { continueRunEvent->SinkIds = GetIds(SinksMap); + continueRunEvent->InputTransformIds = GetIds(InputTransformsMap); if (!UseCpuQuota()) { Send(TaskRunnerActorId, continueRunEvent.release()); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index e8b46850b39..97a689d09df 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1843,6 +1843,11 @@ private: return sinkInfo.Buffer ? sinkInfo.Buffer->GetStats() : nullptr; } + virtual const TDqAsyncInputBufferStats* GetInputTransformStats(ui64 inputIdx, const TAsyncInputTransformInfo& inputTransformInfo) const { + Y_UNUSED(inputIdx); + return inputTransformInfo.Buffer ? inputTransformInfo.Buffer->GetStats() : nullptr; + } + public: void FillStats(NDqProto::TDqComputeActorStats* dst, bool last) { if (!BasicStats) { @@ -1921,6 +1926,24 @@ public: } } + for (auto& [inputIndex, transformInfo] : InputTransformsMap) { + auto* transformStats = GetInputTransformStats(inputIndex, transformInfo); + if (transformStats && GetProfileStats()) { + YQL_ENSURE(transformStats); + ui64 ingressBytes = transformInfo.AsyncInput ? transformInfo.AsyncInput->GetIngressBytes() : 0; + + auto* protoTransform = protoTask->AddInputTransforms(); + protoTransform->SetInputIndex(inputIndex); + protoTransform->SetChunks(transformStats->Chunks); + protoTransform->SetBytes(transformStats->Bytes); + protoTransform->SetRowsIn(transformStats->RowsIn); + protoTransform->SetRowsOut(transformStats->RowsOut); + protoTransform->SetIngressBytes(ingressBytes); + + protoTransform->SetMaxMemoryUsage(transformStats->MaxMemoryUsage); + } + } + for (auto& [name, bytes] : Egress) { auto* egressStats = protoTask->AddEgress(); egressStats->SetName(name); diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index 1647c92558d..a0486c2c5c5 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -154,6 +154,7 @@ message TDqTaskStats { repeated TDqInputChannelStats InputChannels = 151; repeated TDqAsyncOutputBufferStats Sinks = 152; repeated TDqOutputChannelStats OutputChannels = 153; + repeated TDqAsyncInputBufferStats InputTransforms = 155; google.protobuf.Any Extra = 200; } diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h index bea7d5becf2..5460d150186 100644 --- a/ydb/library/yql/dq/actors/task_runner/events.h +++ b/ydb/library/yql/dq/actors/task_runner/events.h @@ -335,6 +335,7 @@ struct TEvContinueRun TMaybe<TCheckpointRequest> CheckpointRequest = Nothing(); bool CheckpointOnly = false; TVector<ui32> SinkIds; + TVector<ui32> InputTransformIds; }; struct TEvAsyncInputPushFinished @@ -403,10 +404,14 @@ struct TEvLoadTaskRunnerFromStateDone : NActors::TEventLocal<TEvLoadTaskRunnerFr struct TEvStatistics : NActors::TEventLocal<TEvStatistics, TTaskRunnerEvents::ES_STATISTICS> { - explicit TEvStatistics(TVector<ui32>&& sinkIds) : SinkIds(std::move(sinkIds)), Stats() { + explicit TEvStatistics(TVector<ui32>&& sinkIds, TVector<ui32>&& inputTransformIds) + : SinkIds(std::move(sinkIds)) + , InputTransformIds(std::move(inputTransformIds)) + , Stats() { } TVector<ui32> SinkIds; + TVector<ui32> InputTransformIds; NDq::TDqTaskRunnerStatsView Stats; }; diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index fb5966b35f5..6ad40817260 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -89,7 +89,14 @@ private: for (const auto sinkId : ev->Get()->SinkIds) { sinkStats[sinkId] = TaskRunner->GetSink(sinkId)->GetStats(); } - ev->Get()->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinkStats)); + + THashMap<ui32, const TDqAsyncInputBufferStats*> inputTransformStats; + for (const auto inputTransformId : ev->Get()->InputTransformIds) { + inputTransformStats[inputTransformId] = TaskRunner->GetInputTransform(inputTransformId).second->GetStats(); + } + + ev->Get()->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinkStats), + std::move(inputTransformStats)); Send( ev->Sender, ev->Release().Release(), @@ -215,14 +222,20 @@ private: } { - auto st = MakeHolder<TEvStatistics>(std::move(ev->Get()->SinkIds)); + auto st = MakeHolder<TEvStatistics>(std::move(ev->Get()->SinkIds), std::move(ev->Get()->InputTransformIds)); TaskRunner->UpdateStats(); THashMap<ui32, const TDqAsyncOutputBufferStats*> sinkStats; for (const auto sinkId : st->SinkIds) { sinkStats[sinkId] = TaskRunner->GetSink(sinkId)->GetStats(); } - st->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinkStats)); + + THashMap<ui32, const TDqAsyncInputBufferStats*> inputTransformStats; + for (const auto inputTransformId : st->InputTransformIds) { // TODO + inputTransformStats[inputTransformId] = TaskRunner->GetInputTransform(inputTransformId).second->GetStats(); + } + + st->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinkStats), std::move(inputTransformStats)); Send(ev->Sender, st.Release()); } diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 433f46edb08..262fe77413e 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -188,10 +188,12 @@ public: , IsDefined(true) { } - TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats, THashMap<ui32, const TDqAsyncOutputBufferStats*>&& sinkStats) + TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats, THashMap<ui32, const TDqAsyncOutputBufferStats*>&& sinkStats, + THashMap<ui32, const TDqAsyncInputBufferStats*>&& inputTransformStats) : StatsPtr(stats) , IsDefined(true) - , SinkStats(std::move(sinkStats)) { + , SinkStats(std::move(sinkStats)) + , InputTransformStats(std::move(inputTransformStats)) { } const TTaskRunnerStatsBase* Get() { @@ -209,10 +211,15 @@ public: return SinkStats.at(sinkId); } + const TDqAsyncInputBufferStats* GetInputTransformStats(ui32 inputTransformId) const { + return InputTransformStats.at(inputTransformId); + } + private: const TDqTaskRunnerStats* StatsPtr; bool IsDefined; THashMap<ui32, const TDqAsyncOutputBufferStats*> SinkStats; + THashMap<ui32, const TDqAsyncInputBufferStats*> InputTransformStats; }; struct TDqTaskRunnerContext { |