aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-04-12 23:57:36 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-04-12 23:57:36 +0300
commit31ee948ba6ec61a904299ec395dcd523f2755590 (patch)
tree74b397b426818ef9ce07a39b4e7d76357704f20c
parentc39b26541160e24365cf7d5981215c33913e9f14 (diff)
downloadydb-31ee948ba6ec61a904299ec395dcd523f2755590.tar.gz
provide shards count for nodes on scan
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.cpp19
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.h6
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp6
-rw-r--r--ydb/core/kqp/opt/kqp_query_plan.cpp11
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp65
-rw-r--r--ydb/core/protos/kqp_stats.proto6
6 files changed, 111 insertions, 2 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
index 9ec8de3471f..60d0f6fdfc3 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
@@ -305,6 +305,25 @@ void TQueryExecutionStats::Finish() {
Result->SetResultRows(ResultRows);
ExtraStats.SetAffectedShards(AffectedShards.size());
+ if (CollectProfileStats(StatsMode)) {
+ for (auto&& s : ShardsCountByNode) {
+ for (auto&& pbs : *Result->MutableStages()) {
+ if (pbs.GetStageId() == s.first) {
+ NKqpProto::TKqpStageExtraStats pbStats;
+ if (pbs.HasExtra() && !pbs.GetExtra().UnpackTo(&pbStats)) {
+ break;
+ }
+ for (auto&& i : s.second) {
+ auto& nodeStat = *pbStats.AddNodeStats();
+ nodeStat.SetNodeId(i.first);
+ nodeStat.SetShardsCount(i.second);
+ }
+ pbs.MutableExtra()->PackFrom(pbStats);
+ break;
+ }
+ }
+ }
+ }
Result->MutableExtra()->PackFrom(ExtraStats);
if (CollectFullStats(StatsMode)) {
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
index 4c936207bca..521f4f1c8df 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
@@ -14,6 +14,9 @@ bool CollectFullStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
struct TQueryExecutionStats {
+private:
+ std::map<ui32, std::map<ui32, ui32>> ShardsCountByNode;
+public:
const Ydb::Table::QueryStatsCollection::Mode StatsMode;
const TKqpTasksGraph* const TasksGraph = nullptr;
NYql::NDqProto::TDqExecutionStats* const Result;
@@ -47,6 +50,9 @@ struct TQueryExecutionStats {
}
void AddComputeActorStats(ui32 nodeId, NYql::NDqProto::TDqComputeActorStats&& stats);
+ void AddNodeShardsCount(const ui32 stageId, const ui32 nodeId, const ui32 shardsCount) {
+ Y_VERIFY(ShardsCountByNode[stageId].emplace(nodeId, shardsCount).second);
+ }
void AddDatashardPrepareStats(NKikimrQueryStats::TTxStats&& txStats);
void AddDatashardStats(NYql::NDqProto::TDqComputeActorStats&& stats, NKikimrQueryStats::TTxStats&& txStats);
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 68e11574863..079ee399e9d 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -354,6 +354,12 @@ private:
nodeShards[nodeId].emplace_back(TShardInfoWithId(i.first, std::move(i.second)));
}
+ if (Stats && CollectProfileStats(Request.StatsMode)) {
+ for (auto&& i : nodeShards) {
+ Stats->AddNodeShardsCount(stageInfo.Id.StageId, i.first, i.second.size());
+ }
+ }
+
if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead()) {
for (auto&& pair : nodeShards) {
auto& shardsInfo = pair.second;
diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp
index 17fa8edfd90..c523a703aed 100644
--- a/ydb/core/kqp/opt/kqp_query_plan.cpp
+++ b/ydb/core/kqp/opt/kqp_query_plan.cpp
@@ -1711,6 +1711,17 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
stats["TotalOutputRows"] = (*stat)->GetOutputRows().GetSum();
stats["TotalOutputBytes"] = (*stat)->GetOutputBytes().GetSum();
+ NKqpProto::TKqpStageExtraStats kqpStageStats;
+ if ((*stat)->GetExtra().UnpackTo(&kqpStageStats)) {
+ auto& nodesStats = stats.InsertValue("NodesScanShards", NJson::JSON_ARRAY);
+ for (auto&& i : kqpStageStats.GetNodeStats()) {
+ auto& nodeInfo = nodesStats.AppendValue(NJson::JSON_MAP);
+ nodeInfo.InsertValue("shards_count", i.GetShardsCount());
+ nodeInfo.InsertValue("node_id", i.GetNodeId());
+ }
+
+ }
+
for (auto& caStats : (*stat)->GetComputeActors()) {
auto& caNode = stats["ComputeNodes"].AppendValue(NJson::TJsonValue());
fillCaStats(caNode, caStats);
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 21c8b052c97..7746aa6410e 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -220,9 +220,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString());
}
- TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it) {
+ TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it, NJson::TJsonValue* statInfo = nullptr) {
TVector<THashMap<TString, NYdb::TValue>> rows;
-
+ if (statInfo) {
+ *statInfo = NJson::JSON_NULL;
+ }
for (;;) {
auto streamPart = it.ReadNext().GetValueSync();
if (!streamPart.IsSuccess()) {
@@ -233,6 +235,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.HasQueryStats(),
"Unexpected empty scan query response.");
+ if (streamPart.HasQueryStats()) {
+ auto plan = streamPart.GetQueryStats().GetPlan();
+ if (plan && statInfo) {
+ UNIT_ASSERT(NJson::ReadJsonFastTree(*plan, statInfo));
+ }
+ }
+
if (streamPart.HasResultSet()) {
auto resultSet = streamPart.ExtractResultSet();
NYdb::TResultSetParser rsParser(resultSet);
@@ -544,6 +553,58 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}
+ Y_UNIT_TEST(SimpleQueryOlapStats) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+
+ // EnableDebugLogging(kikimr);
+
+ TLocalHelper(kikimr).CreateTestOlapTable();
+
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2);
+
+ auto client = kikimr.GetTableClient();
+
+ // EnableDebugLogging(kikimr);
+
+ {
+ TStreamExecScanQuerySettings settings;
+ settings.CollectQueryStats(ECollectQueryStatsMode::Full);
+ auto it = client.StreamExecuteScanQuery(R"(
+ --!syntax_v1
+ SELECT `resource_id`, `timestamp`
+ FROM `/Root/olapStore/olapTable`
+ ORDER BY `resource_id`, `timestamp`
+ )", settings).GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ NJson::TJsonValue jsonStat;
+ CollectRows(it, &jsonStat);
+ UNIT_ASSERT(!jsonStat.IsNull());
+ const TString plan = jsonStat.GetStringRobust();
+ Cerr << plan << Endl;
+ UNIT_ASSERT(plan.find("NodesScanShards") == TString::npos);
+ }
+
+ {
+ TStreamExecScanQuerySettings settings;
+ settings.CollectQueryStats(ECollectQueryStatsMode::Profile);
+ auto it = client.StreamExecuteScanQuery(R"(
+ --!syntax_v1
+ SELECT `resource_id`, `timestamp`
+ FROM `/Root/olapStore/olapTable`
+ ORDER BY `resource_id`, `timestamp`
+ )", settings).GetValueSync();
+ NJson::TJsonValue jsonStat;
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ CollectRows(it, &jsonStat);
+ const TString plan = jsonStat.GetStringRobust();
+ Cerr << plan << Endl;
+ UNIT_ASSERT(plan.find("NodesScanShards") != TString::npos);
+ }
+ }
+
Y_UNIT_TEST(SimpleLookupOlap) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
diff --git a/ydb/core/protos/kqp_stats.proto b/ydb/core/protos/kqp_stats.proto
index 1869da2b121..24e09f2dcd9 100644
--- a/ydb/core/protos/kqp_stats.proto
+++ b/ydb/core/protos/kqp_stats.proto
@@ -41,6 +41,7 @@ message TKqpComputeActorExtraStats {
// extra for NYql.NDqProto.TDqStageStats
message TKqpStageExtraStats {
repeated NYql.NDqProto.TDqTaskStats DatashardTasks = 1;
+ repeated TNodeExecutionStats NodeStats = 4;
}
message TKqpScanTaskExtraStats {
@@ -52,6 +53,11 @@ message TKqpTaskExtraStats {
TKqpScanTaskExtraStats ScanTaskExtraStats = 1;
}
+message TNodeExecutionStats {
+ optional uint32 NodeId = 1;
+ optional uint32 ShardsCount = 2;
+}
+
// extra for NYql.NDqProto.TDqExecutionStats
message TKqpExecutionExtraStats {
// Basic stats