aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-02-23 20:32:27 +0300
committerulya-sidorina <yulia@ydb.tech>2023-02-23 20:32:27 +0300
commit09f337323884478aee3d0da15e9bbfd28411f6de (patch)
tree2d1b05498e53e7c449379784380ecf2d5e474ddc
parent73dbf37dfe3ffaf8923d0ce1f89c0b00148345fb (diff)
downloadydb-09f337323884478aee3d0da15e9bbfd28411f6de.tar.gz
fix stats collecting for stream lookup actor
bugfix(kqp): collect stats for stream lookup actor
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp33
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h5
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp5
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.cpp7
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp39
-rw-r--r--ydb/core/kqp/ut/query/kqp_stats_ut.cpp36
-rw-r--r--ydb/core/protos/kqp_stats.proto5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h6
9 files changed, 89 insertions, 52 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
index cb2c43db43..51e854da17 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
@@ -25,37 +25,4 @@ bool FindSchemeErrorInIssues(const Ydb::StatusIds::StatusCode& status, const NYq
return schemeError;
}
-void FillTaskInputStats(const NYql::NDqProto::TDqTask& task, NYql::NDqProto::TDqTaskStats& taskStats) {
- THashMap<ui32, TString> inputTables;
-
- for (ui32 inputIndex = 0; inputIndex < task.InputsSize(); ++inputIndex) {
- const auto& taskInput = task.GetInputs(inputIndex);
- if (taskInput.HasTransform()) {
- const auto& transform = taskInput.GetTransform();
- YQL_ENSURE(transform.GetType() == "StreamLookupInputTransformer",
- "Unexpected input transform type: " << transform.GetType());
-
- const google::protobuf::Any &settingsAny = transform.GetSettings();
- YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: "
- << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name()
- << " , but got: " << settingsAny.type_url());
-
- NKikimrKqp::TKqpStreamLookupSettings settings;
- YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings");
-
- inputTables.insert({inputIndex, settings.GetTable().GetPath()});
- }
- }
-
- for (const auto& transformerStats : taskStats.GetInputTransforms()) {
- auto tableIt = inputTables.find(transformerStats.GetInputIndex());
- YQL_ENSURE(tableIt != inputTables.end());
-
- auto* tableStats = taskStats.AddTables();
- tableStats->SetTablePath(tableIt->second);
- tableStats->SetReadRows(transformerStats.GetRowsOut());
- tableStats->SetReadBytes(transformerStats.GetBytes());
- }
-}
-
} // namespace NKikimr::NKqp::NComputeActor
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
index cab30fb4a5..b32537d951 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
@@ -12,11 +12,6 @@ namespace NKqp {
using namespace NYql;
using namespace NYql::NDq;
-namespace NComputeActor {
-void FillTaskInputStats(const NDqProto::TDqTask& task, NDqProto::TDqTaskStats& taskStats);
-
-} // namespace NComputeActor
-
class TKqpTaskRunnerExecutionContext : public IDqTaskRunnerExecutionContext {
public:
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp,
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index 4a8ca2587d..4be90552a9 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -193,11 +193,6 @@ public:
tableStats->MutableExtra()->PackFrom(tableExtraStats);
}
}
-
- if (last && dst->TasksSize() > 0) {
- YQL_ENSURE(dst->TasksSize() == 1);
- NComputeActor::FillTaskInputStats(GetTask(), *dst->MutableTasks(0));
- }
}
private:
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index f3098274db..911fb69ca1 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -285,11 +285,6 @@ public:
// dst->MutableExtra()->PackFrom(extraStats);
}
-
- if (last && dst->TasksSize() > 0) {
- YQL_ENSURE(dst->TasksSize() == 1);
- NComputeActor::FillTaskInputStats(GetTask(), *dst->MutableTasks(0));
- }
}
protected:
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
index c8961e499f..3a59d76574 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
@@ -136,6 +136,13 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt
tableAggr->SetWriteBytes(tableAggr->GetWriteBytes() + table.GetWriteBytes());
tableAggr->SetEraseRows(tableAggr->GetEraseRows() + table.GetEraseRows());
tableAggr->SetAffectedPartitions(tableAggr->GetAffectedPartitions() + table.GetAffectedPartitions());
+
+ NKqpProto::TKqpReadActorTableAggrExtraStats tableExtraStats;
+ if (table.GetExtra().UnpackTo(&tableExtraStats)) {
+ for (const auto& shardId : tableExtraStats.GetAffectedShards()) {
+ AffectedShards.insert(shardId);
+ }
+ }
}
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 95d0908192..ed1fee9b20 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -8,6 +8,7 @@
#include <ydb/core/kqp/common/kqp_resolve.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/protos/kqp.pb.h>
+#include <ydb/core/protos/kqp_stats.pb.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/kqp/common/kqp_event_ids.h>
@@ -28,7 +29,8 @@ public:
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings)
: InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv)
- , HolderFactory(holderFactory), Alloc(alloc), TableId(MakeTableId(settings.GetTable()))
+ , HolderFactory(holderFactory), Alloc(alloc), TablePath(settings.GetTable().GetPath())
+ , TableId(MakeTableId(settings.GetTable()))
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, ImmediateTx(settings.GetImmediateTx())
@@ -82,6 +84,34 @@ public:
return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR;
}
+ void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last) override {
+ if (last) {
+ NYql::NDqProto::TDqTableStats* tableStats = nullptr;
+ for (auto& table : *stats->MutableTables()) {
+ if (table.GetTablePath() == TablePath) {
+ tableStats = &table;
+ }
+ }
+
+ if (!tableStats) {
+ tableStats = stats->AddTables();
+ tableStats->SetTablePath(TablePath);
+ }
+
+ // TODO: use evread statistics after KIKIMR-16924
+ tableStats->SetReadRows(tableStats->GetReadRows() + ReadRowsCount);
+ tableStats->SetReadBytes(tableStats->GetReadBytes() + ReadBytesCount);
+ tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size());
+
+ NKqpProto::TKqpReadActorTableAggrExtraStats tableExtraStats;
+ for (const auto& [shardId, _] : ReadsPerShard) {
+ tableExtraStats.AddAffectedShards(shardId);
+ }
+
+ tableStats->MutableExtra()->PackFrom(tableExtraStats);
+ }
+ }
+
private:
enum class EReadState {
Initial,
@@ -346,6 +376,8 @@ private:
}
batch.push_back(std::move(row));
+ ++ReadRowsCount;
+ ReadBytesCount += rowSize;
totalSize += rowSize;
}
@@ -580,6 +612,7 @@ private:
const NMiniKQL::TTypeEnvironment& TypeEnv;
const NMiniKQL::THolderFactory& HolderFactory;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
+ const TString TablePath;
const TTableId TableId;
IKqpGateway::TKqpSnapshot Snapshot;
const TMaybe<ui64> LockTxId;
@@ -596,6 +629,10 @@ private:
NActors::TActorId SchemeCacheRequestTimeoutTimer;
TVector<NKikimrTxDataShard::TLock> Locks;
TVector<NKikimrTxDataShard::TLock> BrokenLocks;
+
+ // stats
+ ui64 ReadRowsCount = 0;
+ ui64 ReadBytesCount = 0;
};
} // namespace
diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
index 04a58539bb..b91955cb65 100644
--- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
@@ -1,6 +1,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
@@ -345,6 +346,41 @@ Y_UNIT_TEST(StatsProfile) {
//UNIT_ASSERT_EQUAL(node2.GetMap().at("Stats").GetMapSafe().at("ComputeNodes").GetArraySafe().size(), 1);
}
+Y_UNIT_TEST(StreamLookupStats) {
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
+ TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(app));
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ TExecDataQuerySettings settings;
+ settings.CollectQueryStats(ECollectQueryStatsMode::Full);
+
+ auto result = session.ExecuteDataQuery(R"(
+ $keys = SELECT Key FROM `/Root/KeyValue`;
+ SELECT * FROM `/Root/TwoShard` WHERE Key in $keys;
+ )", TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ Cerr << result.GetQueryPlan() << Endl;
+
+ NJson::TJsonValue plan;
+ NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true);
+ auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
+ UNIT_ASSERT(streamLookup.IsDefined());
+
+ auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/TwoShard");
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).partitions_count(), 1);
+
+ AssertTableStats(result, "/Root/TwoShard", {
+ .ExpectedReads = 2,
+ });
+}
+
} // suite
} // namespace NKqp
diff --git a/ydb/core/protos/kqp_stats.proto b/ydb/core/protos/kqp_stats.proto
index c2579c24ff..d13a1a5a39 100644
--- a/ydb/core/protos/kqp_stats.proto
+++ b/ydb/core/protos/kqp_stats.proto
@@ -24,6 +24,11 @@ message TKqpShardTableAggrExtraStats {
NYql.NDqProto.TDqStatsAggr ShardCpuTimeUs = 2;
}
+// aggregated read actor extra stats for table
+message TKqpReadActorTableAggrExtraStats {
+ repeated uint64 AffectedShards = 1;
+}
+
message TKqpScanTableExtraStats {
// IScan stats
uint64 IScanStartTimeMs = 1; // start IScan timestamp
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index d52073182e..8dc0d00ae0 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1973,10 +1973,10 @@ public:
protoTransform->SetIngressBytes(ingressBytes);
protoTransform->SetMaxMemoryUsage(transformStats->MaxMemoryUsage);
+ }
- if (auto* transform = transformInfo.AsyncInput) {
- transform->FillExtraStats(protoTask, last);
- }
+ if (auto* transform = transformInfo.AsyncInput) {
+ transform->FillExtraStats(protoTask, last);
}
}