summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <[email protected]>2023-01-10 17:22:37 +0300
committerulya-sidorina <[email protected]>2023-01-10 17:22:37 +0300
commit7e9cd0d757a65e066603131c6d54fb93e68ab5cb (patch)
treef92d8e330b773250b1587f57d8e2eb97733970c2
parent9c2656be278f707dd2ed430b385352254ea2ec6a (diff)
collect input transform stats
feature(kqp): collect input transform stats
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp33
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h5
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp5
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp5
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp21
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp8
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h23
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_stats.proto1
-rw-r--r--ydb/library/yql/dq/actors/task_runner/events.h7
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp19
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h11
11 files changed, 130 insertions, 8 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
index 51e854da17e..cb2c43db43b 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
@@ -25,4 +25,37 @@ bool FindSchemeErrorInIssues(const Ydb::StatusIds::StatusCode& status, const NYq
return schemeError;
}
+void FillTaskInputStats(const NYql::NDqProto::TDqTask& task, NYql::NDqProto::TDqTaskStats& taskStats) {
+ THashMap<ui32, TString> inputTables;
+
+ for (ui32 inputIndex = 0; inputIndex < task.InputsSize(); ++inputIndex) {
+ const auto& taskInput = task.GetInputs(inputIndex);
+ if (taskInput.HasTransform()) {
+ const auto& transform = taskInput.GetTransform();
+ YQL_ENSURE(transform.GetType() == "StreamLookupInputTransformer",
+ "Unexpected input transform type: " << transform.GetType());
+
+ const google::protobuf::Any &settingsAny = transform.GetSettings();
+ YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: "
+ << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name()
+ << " , but got: " << settingsAny.type_url());
+
+ NKikimrKqp::TKqpStreamLookupSettings settings;
+ YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings");
+
+ inputTables.insert({inputIndex, settings.GetTable().GetPath()});
+ }
+ }
+
+ for (const auto& transformerStats : taskStats.GetInputTransforms()) {
+ auto tableIt = inputTables.find(transformerStats.GetInputIndex());
+ YQL_ENSURE(tableIt != inputTables.end());
+
+ auto* tableStats = taskStats.AddTables();
+ tableStats->SetTablePath(tableIt->second);
+ tableStats->SetReadRows(transformerStats.GetRowsOut());
+ tableStats->SetReadBytes(transformerStats.GetBytes());
+ }
+}
+
} // namespace NKikimr::NKqp::NComputeActor
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
index b32537d9514..cab30fb4a50 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
@@ -12,6 +12,11 @@ namespace NKqp {
using namespace NYql;
using namespace NYql::NDq;
+namespace NComputeActor {
+void FillTaskInputStats(const NDqProto::TDqTask& task, NDqProto::TDqTaskStats& taskStats);
+
+} // namespace NComputeActor
+
class TKqpTaskRunnerExecutionContext : public IDqTaskRunnerExecutionContext {
public:
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp,
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index ed6caca25ad..c9131f3efdf 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -192,6 +192,11 @@ public:
tableStats->MutableExtra()->PackFrom(tableExtraStats);
}
}
+
+ if (last && dst->TasksSize() > 0) {
+ YQL_ENSURE(dst->TasksSize() == 1);
+ NComputeActor::FillTaskInputStats(GetTask(), *dst->MutableTasks(0));
+ }
}
private:
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index dc5f72a4042..9beb26a3fa8 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -284,6 +284,11 @@ public:
// dst->MutableExtra()->PackFrom(extraStats);
}
+
+ if (last && dst->TasksSize() > 0) {
+ YQL_ENSURE(dst->TasksSize() == 1);
+ NComputeActor::FillTaskInputStats(GetTask(), *dst->MutableTasks(0));
+ }
}
protected:
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index f8fa2ef0179..c591b2d63f5 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -3297,7 +3297,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
TExecDataQuerySettings querySettings;
- querySettings.CollectQueryStats(ECollectQueryStatsMode::Full);
+ querySettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
{
auto result = session.ExecuteDataQuery(R"(
@@ -3314,6 +3314,13 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true);
auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
UNIT_ASSERT(streamLookup.IsDefined());
+
+ auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/EightShard");
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).name(), "/Root/KeyValue");
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).reads().rows(), 2);
}
{
@@ -3341,6 +3348,12 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true);
auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
UNIT_ASSERT(streamLookup.IsDefined());
+
+ auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/KeyValue");
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 2);
}
{
@@ -3357,6 +3370,12 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true);
auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
UNIT_ASSERT(streamLookup.IsDefined());
+
+ auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/KeyValue");
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1);
}
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index d3d0cb9a703..b84e55a0e00 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -124,7 +124,7 @@ private:
void OnStateRequest(TEvDqCompute::TEvStateRequest::TPtr& ev) {
CA_LOG_T("Got TEvStateRequest from actor " << ev->Sender << " PingCookie: " << ev->Cookie);
if (!SentStatsRequest) {
- Send(TaskRunnerActorId, new NTaskRunnerActor::TEvStatistics(GetIds(SinksMap)));
+ Send(TaskRunnerActorId, new NTaskRunnerActor::TEvStatistics(GetIds(SinksMap), GetIds(InputTransformsMap)));
SentStatsRequest = true;
}
WaitingForStateResponse.push_back({ev->Sender, ev->Cookie});
@@ -187,6 +187,11 @@ private:
return TaskRunnerStats.GetSinkStats(outputIdx);
}
+ const TDqAsyncInputBufferStats* GetInputTransformStats(ui64 inputIdx, const TAsyncInputTransformInfo& inputTransformInfo) const override {
+ Y_UNUSED(inputTransformInfo);
+ return TaskRunnerStats.GetInputTransformStats(inputIdx);
+ }
+
void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) override {
YQL_ENSURE(!outputChannel.Finished || Checkpoints);
@@ -809,6 +814,7 @@ private:
void AskContinueRun(std::unique_ptr<NTaskRunnerActor::TEvContinueRun> continueRunEvent) {
continueRunEvent->SinkIds = GetIds(SinksMap);
+ continueRunEvent->InputTransformIds = GetIds(InputTransformsMap);
if (!UseCpuQuota()) {
Send(TaskRunnerActorId, continueRunEvent.release());
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index e8b46850b39..97a689d09df 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1843,6 +1843,11 @@ private:
return sinkInfo.Buffer ? sinkInfo.Buffer->GetStats() : nullptr;
}
+ virtual const TDqAsyncInputBufferStats* GetInputTransformStats(ui64 inputIdx, const TAsyncInputTransformInfo& inputTransformInfo) const {
+ Y_UNUSED(inputIdx);
+ return inputTransformInfo.Buffer ? inputTransformInfo.Buffer->GetStats() : nullptr;
+ }
+
public:
void FillStats(NDqProto::TDqComputeActorStats* dst, bool last) {
if (!BasicStats) {
@@ -1921,6 +1926,24 @@ public:
}
}
+ for (auto& [inputIndex, transformInfo] : InputTransformsMap) {
+ auto* transformStats = GetInputTransformStats(inputIndex, transformInfo);
+ if (transformStats && GetProfileStats()) {
+ YQL_ENSURE(transformStats);
+ ui64 ingressBytes = transformInfo.AsyncInput ? transformInfo.AsyncInput->GetIngressBytes() : 0;
+
+ auto* protoTransform = protoTask->AddInputTransforms();
+ protoTransform->SetInputIndex(inputIndex);
+ protoTransform->SetChunks(transformStats->Chunks);
+ protoTransform->SetBytes(transformStats->Bytes);
+ protoTransform->SetRowsIn(transformStats->RowsIn);
+ protoTransform->SetRowsOut(transformStats->RowsOut);
+ protoTransform->SetIngressBytes(ingressBytes);
+
+ protoTransform->SetMaxMemoryUsage(transformStats->MaxMemoryUsage);
+ }
+ }
+
for (auto& [name, bytes] : Egress) {
auto* egressStats = protoTask->AddEgress();
egressStats->SetName(name);
diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto
index 1647c92558d..a0486c2c5c5 100644
--- a/ydb/library/yql/dq/actors/protos/dq_stats.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto
@@ -154,6 +154,7 @@ message TDqTaskStats {
repeated TDqInputChannelStats InputChannels = 151;
repeated TDqAsyncOutputBufferStats Sinks = 152;
repeated TDqOutputChannelStats OutputChannels = 153;
+ repeated TDqAsyncInputBufferStats InputTransforms = 155;
google.protobuf.Any Extra = 200;
}
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h
index bea7d5becf2..5460d150186 100644
--- a/ydb/library/yql/dq/actors/task_runner/events.h
+++ b/ydb/library/yql/dq/actors/task_runner/events.h
@@ -335,6 +335,7 @@ struct TEvContinueRun
TMaybe<TCheckpointRequest> CheckpointRequest = Nothing();
bool CheckpointOnly = false;
TVector<ui32> SinkIds;
+ TVector<ui32> InputTransformIds;
};
struct TEvAsyncInputPushFinished
@@ -403,10 +404,14 @@ struct TEvLoadTaskRunnerFromStateDone : NActors::TEventLocal<TEvLoadTaskRunnerFr
struct TEvStatistics : NActors::TEventLocal<TEvStatistics, TTaskRunnerEvents::ES_STATISTICS>
{
- explicit TEvStatistics(TVector<ui32>&& sinkIds) : SinkIds(std::move(sinkIds)), Stats() {
+ explicit TEvStatistics(TVector<ui32>&& sinkIds, TVector<ui32>&& inputTransformIds)
+ : SinkIds(std::move(sinkIds))
+ , InputTransformIds(std::move(inputTransformIds))
+ , Stats() {
}
TVector<ui32> SinkIds;
+ TVector<ui32> InputTransformIds;
NDq::TDqTaskRunnerStatsView Stats;
};
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index fb5966b35f5..6ad40817260 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -89,7 +89,14 @@ private:
for (const auto sinkId : ev->Get()->SinkIds) {
sinkStats[sinkId] = TaskRunner->GetSink(sinkId)->GetStats();
}
- ev->Get()->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinkStats));
+
+ THashMap<ui32, const TDqAsyncInputBufferStats*> inputTransformStats;
+ for (const auto inputTransformId : ev->Get()->InputTransformIds) {
+ inputTransformStats[inputTransformId] = TaskRunner->GetInputTransform(inputTransformId).second->GetStats();
+ }
+
+ ev->Get()->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinkStats),
+ std::move(inputTransformStats));
Send(
ev->Sender,
ev->Release().Release(),
@@ -215,14 +222,20 @@ private:
}
{
- auto st = MakeHolder<TEvStatistics>(std::move(ev->Get()->SinkIds));
+ auto st = MakeHolder<TEvStatistics>(std::move(ev->Get()->SinkIds), std::move(ev->Get()->InputTransformIds));
TaskRunner->UpdateStats();
THashMap<ui32, const TDqAsyncOutputBufferStats*> sinkStats;
for (const auto sinkId : st->SinkIds) {
sinkStats[sinkId] = TaskRunner->GetSink(sinkId)->GetStats();
}
- st->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinkStats));
+
+ THashMap<ui32, const TDqAsyncInputBufferStats*> inputTransformStats;
+ for (const auto inputTransformId : st->InputTransformIds) { // TODO
+ inputTransformStats[inputTransformId] = TaskRunner->GetInputTransform(inputTransformId).second->GetStats();
+ }
+
+ st->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinkStats), std::move(inputTransformStats));
Send(ev->Sender, st.Release());
}
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 433f46edb08..262fe77413e 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -188,10 +188,12 @@ public:
, IsDefined(true) {
}
- TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats, THashMap<ui32, const TDqAsyncOutputBufferStats*>&& sinkStats)
+ TDqTaskRunnerStatsView(const TDqTaskRunnerStats* stats, THashMap<ui32, const TDqAsyncOutputBufferStats*>&& sinkStats,
+ THashMap<ui32, const TDqAsyncInputBufferStats*>&& inputTransformStats)
: StatsPtr(stats)
, IsDefined(true)
- , SinkStats(std::move(sinkStats)) {
+ , SinkStats(std::move(sinkStats))
+ , InputTransformStats(std::move(inputTransformStats)) {
}
const TTaskRunnerStatsBase* Get() {
@@ -209,10 +211,15 @@ public:
return SinkStats.at(sinkId);
}
+ const TDqAsyncInputBufferStats* GetInputTransformStats(ui32 inputTransformId) const {
+ return InputTransformStats.at(inputTransformId);
+ }
+
private:
const TDqTaskRunnerStats* StatsPtr;
bool IsDefined;
THashMap<ui32, const TDqAsyncOutputBufferStats*> SinkStats;
+ THashMap<ui32, const TDqAsyncInputBufferStats*> InputTransformStats;
};
struct TDqTaskRunnerContext {