diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-12 23:57:36 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-12 23:57:36 +0300 |
commit | 31ee948ba6ec61a904299ec395dcd523f2755590 (patch) | |
tree | 74b397b426818ef9ce07a39b4e7d76357704f20c | |
parent | c39b26541160e24365cf7d5981215c33913e9f14 (diff) | |
download | ydb-31ee948ba6ec61a904299ec395dcd523f2755590.tar.gz |
provide shards count for nodes on scan
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_stats.cpp | 19 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_stats.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_query_plan.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 65 | ||||
-rw-r--r-- | ydb/core/protos/kqp_stats.proto | 6 |
6 files changed, 111 insertions, 2 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index 9ec8de3471f..60d0f6fdfc3 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -305,6 +305,25 @@ void TQueryExecutionStats::Finish() { Result->SetResultRows(ResultRows); ExtraStats.SetAffectedShards(AffectedShards.size()); + if (CollectProfileStats(StatsMode)) { + for (auto&& s : ShardsCountByNode) { + for (auto&& pbs : *Result->MutableStages()) { + if (pbs.GetStageId() == s.first) { + NKqpProto::TKqpStageExtraStats pbStats; + if (pbs.HasExtra() && !pbs.GetExtra().UnpackTo(&pbStats)) { + break; + } + for (auto&& i : s.second) { + auto& nodeStat = *pbStats.AddNodeStats(); + nodeStat.SetNodeId(i.first); + nodeStat.SetShardsCount(i.second); + } + pbs.MutableExtra()->PackFrom(pbStats); + break; + } + } + } + } Result->MutableExtra()->PackFrom(ExtraStats); if (CollectFullStats(StatsMode)) { diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h index 4c936207bca..521f4f1c8df 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h @@ -14,6 +14,9 @@ bool CollectFullStats(Ydb::Table::QueryStatsCollection::Mode statsMode); bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode); struct TQueryExecutionStats { +private: + std::map<ui32, std::map<ui32, ui32>> ShardsCountByNode; +public: const Ydb::Table::QueryStatsCollection::Mode StatsMode; const TKqpTasksGraph* const TasksGraph = nullptr; NYql::NDqProto::TDqExecutionStats* const Result; @@ -47,6 +50,9 @@ struct TQueryExecutionStats { } void AddComputeActorStats(ui32 nodeId, NYql::NDqProto::TDqComputeActorStats&& stats); + void AddNodeShardsCount(const ui32 stageId, const ui32 nodeId, const ui32 shardsCount) { + Y_VERIFY(ShardsCountByNode[stageId].emplace(nodeId, shardsCount).second); + } void AddDatashardPrepareStats(NKikimrQueryStats::TTxStats&& txStats); void AddDatashardStats(NYql::NDqProto::TDqComputeActorStats&& stats, NKikimrQueryStats::TTxStats&& txStats); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 68e11574863..079ee399e9d 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -354,6 +354,12 @@ private: nodeShards[nodeId].emplace_back(TShardInfoWithId(i.first, std::move(i.second))); } + if (Stats && CollectProfileStats(Request.StatsMode)) { + for (auto&& i : nodeShards) { + Stats->AddNodeShardsCount(stageInfo.Id.StageId, i.first, i.second.size()); + } + } + if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead()) { for (auto&& pair : nodeShards) { auto& shardsInfo = pair.second; diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index 17fa8edfd90..c523a703aed 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -1711,6 +1711,17 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD stats["TotalOutputRows"] = (*stat)->GetOutputRows().GetSum(); stats["TotalOutputBytes"] = (*stat)->GetOutputBytes().GetSum(); + NKqpProto::TKqpStageExtraStats kqpStageStats; + if ((*stat)->GetExtra().UnpackTo(&kqpStageStats)) { + auto& nodesStats = stats.InsertValue("NodesScanShards", NJson::JSON_ARRAY); + for (auto&& i : kqpStageStats.GetNodeStats()) { + auto& nodeInfo = nodesStats.AppendValue(NJson::JSON_MAP); + nodeInfo.InsertValue("shards_count", i.GetShardsCount()); + nodeInfo.InsertValue("node_id", i.GetNodeId()); + } + + } + for (auto& caStats : (*stat)->GetComputeActors()) { auto& caNode = stats["ComputeNodes"].AppendValue(NJson::TJsonValue()); fillCaStats(caNode, caStats); diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 21c8b052c97..7746aa6410e 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -220,9 +220,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); } - TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it) { + TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it, NJson::TJsonValue* statInfo = nullptr) { TVector<THashMap<TString, NYdb::TValue>> rows; - + if (statInfo) { + *statInfo = NJson::JSON_NULL; + } for (;;) { auto streamPart = it.ReadNext().GetValueSync(); if (!streamPart.IsSuccess()) { @@ -233,6 +235,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.HasQueryStats(), "Unexpected empty scan query response."); + if (streamPart.HasQueryStats()) { + auto plan = streamPart.GetQueryStats().GetPlan(); + if (plan && statInfo) { + UNIT_ASSERT(NJson::ReadJsonFastTree(*plan, statInfo)); + } + } + if (streamPart.HasResultSet()) { auto resultSet = streamPart.ExtractResultSet(); NYdb::TResultSetParser rsParser(resultSet); @@ -544,6 +553,58 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } } + Y_UNIT_TEST(SimpleQueryOlapStats) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + + // EnableDebugLogging(kikimr); + + TLocalHelper(kikimr).CreateTestOlapTable(); + + WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2); + + auto client = kikimr.GetTableClient(); + + // EnableDebugLogging(kikimr); + + { + TStreamExecScanQuerySettings settings; + settings.CollectQueryStats(ECollectQueryStatsMode::Full); + auto it = client.StreamExecuteScanQuery(R"( + --!syntax_v1 + SELECT `resource_id`, `timestamp` + FROM `/Root/olapStore/olapTable` + ORDER BY `resource_id`, `timestamp` + )", settings).GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + NJson::TJsonValue jsonStat; + CollectRows(it, &jsonStat); + UNIT_ASSERT(!jsonStat.IsNull()); + const TString plan = jsonStat.GetStringRobust(); + Cerr << plan << Endl; + UNIT_ASSERT(plan.find("NodesScanShards") == TString::npos); + } + + { + TStreamExecScanQuerySettings settings; + settings.CollectQueryStats(ECollectQueryStatsMode::Profile); + auto it = client.StreamExecuteScanQuery(R"( + --!syntax_v1 + SELECT `resource_id`, `timestamp` + FROM `/Root/olapStore/olapTable` + ORDER BY `resource_id`, `timestamp` + )", settings).GetValueSync(); + NJson::TJsonValue jsonStat; + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + CollectRows(it, &jsonStat); + const TString plan = jsonStat.GetStringRobust(); + Cerr << plan << Endl; + UNIT_ASSERT(plan.find("NodesScanShards") != TString::npos); + } + } + Y_UNIT_TEST(SimpleLookupOlap) { auto settings = TKikimrSettings() .SetWithSampleTables(false); diff --git a/ydb/core/protos/kqp_stats.proto b/ydb/core/protos/kqp_stats.proto index 1869da2b121..24e09f2dcd9 100644 --- a/ydb/core/protos/kqp_stats.proto +++ b/ydb/core/protos/kqp_stats.proto @@ -41,6 +41,7 @@ message TKqpComputeActorExtraStats { // extra for NYql.NDqProto.TDqStageStats message TKqpStageExtraStats { repeated NYql.NDqProto.TDqTaskStats DatashardTasks = 1; + repeated TNodeExecutionStats NodeStats = 4; } message TKqpScanTaskExtraStats { @@ -52,6 +53,11 @@ message TKqpTaskExtraStats { TKqpScanTaskExtraStats ScanTaskExtraStats = 1; } +message TNodeExecutionStats { + optional uint32 NodeId = 1; + optional uint32 ShardsCount = 2; +} + // extra for NYql.NDqProto.TDqExecutionStats message TKqpExecutionExtraStats { // Basic stats |