diff options
author | Sergei Puchin <[email protected]> | 2022-06-19 00:13:19 +0300 |
---|---|---|
committer | Sergei Puchin <[email protected]> | 2022-06-19 00:13:19 +0300 |
commit | 8e2e903d222a17b6ba3da50f43d0e6bf49f8d583 (patch) | |
tree | fdeaea62fb7f4b02ddb0c23e628fe23f68f07847 | |
parent | d28ffff79b7e2c859289d62a313c978a6d009f5d (diff) |
Fix query stats reporting for ScanQueries. (KIKIMR-15013)
ref:351908dc7be3e8df9e53d043f79f8638c509fae9
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_run_scan.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_runner.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_ic_gateway.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 47 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 28 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp | 47 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_scan_ut.cpp | 85 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_stats_ut.cpp | 16 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_sys_view_ut.cpp | 46 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_table_predicate_ut.cpp | 32 |
12 files changed, 161 insertions, 172 deletions
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 97b73df73a5..ee6183d30a6 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -283,10 +283,6 @@ public: queryResult.PreparedQuery = Query; } - /* - * Set stats and plan for DataQuery. In case of ScanQuery they will be set - * later in TStreamExecuteScanQueryRPC. - */ if (ExecuteCtx->QueryResults.size() == 1) { auto& execResult = ExecuteCtx->QueryResults[0]; queryResult.QueryStats.Swap(&execResult.QueryStats); diff --git a/ydb/core/kqp/host/kqp_run_scan.cpp b/ydb/core/kqp/host/kqp_run_scan.cpp index caad430c281..38a67850eb2 100644 --- a/ydb/core/kqp/host/kqp_run_scan.cpp +++ b/ydb/core/kqp/host/kqp_run_scan.cpp @@ -57,10 +57,13 @@ protected: } bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, TExprContext& ctx, bool commit) override { - Y_UNUSED(execResult); Y_UNUSED(ctx); Y_UNUSED(commit); + if (execResult.HasStats()) { + TransformCtx->QueryStats.AddExecutions()->Swap(execResult.MutableStats()); + } + return true; } }; diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index 64ad34e606b..0f6abe1d13b 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -97,12 +97,17 @@ class TScanAsyncRunResult : public TKqpAsyncResultBase<IKikimrQueryExecutor::TQu public: using TResult = IKikimrQueryExecutor::TQueryResult; - TScanAsyncRunResult(const TExprNode::TPtr& queryRoot, TExprContext& exprCtx, IGraphTransformer& transformer) - : TKqpAsyncResultBase(queryRoot, exprCtx, transformer) {} + TScanAsyncRunResult(const TExprNode::TPtr& queryRoot, TExprContext& exprCtx, IGraphTransformer& transformer, + const TKqlTransformContext& transformCtx) + : TKqpAsyncResultBase(queryRoot, exprCtx, transformer) + , TransformCtx(transformCtx) {} void FillResult(TResult& queryResult) const override { - Y_UNUSED(queryResult); + queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats); } + +private: + const TKqlTransformContext& TransformCtx; }; class TKqpIterationGuardTransformer : public TSyncTransformerBase { @@ -609,7 +614,7 @@ private: Y_ASSERT(!TxState->Tx().GetSnapshot().IsValid()); - return MakeIntrusive<TScanAsyncRunResult>(world, ctx, *ScanRunQueryTransformer); + return MakeIntrusive<TScanAsyncRunResult>(world, ctx, *ScanRunQueryTransformer, *TransformCtx); } static bool MergeFlagValue(const TMaybe<bool>& configFlag, const TMaybe<bool>& flag) { diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp index f99e1333b39..2e038d4bbf3 100644 --- a/ydb/core/kqp/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/kqp_ic_gateway.cpp @@ -882,14 +882,6 @@ private: result.ExecuterResult.Swap(response->MutableResult()); - if (Target && result.ExecuterResult.HasStats()) { - auto statsEv = MakeHolder<TEvKqpExecuter::TEvStreamProfile>(); - auto& record = statsEv->Record; - - record.MutableProfile()->Swap(result.ExecuterResult.MutableStats()); - this->Send(Target, statsEv.Release()); - } - Promise.SetValue(std::move(result)); this->PassAway(); } diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 4e59a6ae6f0..81519273875 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -1070,24 +1070,14 @@ public: return; } - bool scan = QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN; - if (scan) { - if (QueryState->RequestActorId && txResult.HasStats()) { - auto statsEv = MakeHolder<TEvKqpExecuter::TEvStreamProfile>(); - auto& record = statsEv->Record; - - record.MutableProfile()->Swap(txResult.MutableStats()); - Send(QueryState->RequestActorId, statsEv.Release()); - } - } else { - if (txResult.HasStats()) { - auto* exec = QueryState->Stats.AddExecutions(); - exec->Swap(txResult.MutableStats()); - } + if (txResult.HasStats()) { + auto* exec = QueryState->Stats.AddExecutions(); + exec->Swap(txResult.MutableStats()); } if (QueryState->PreparedQuery && - QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize()) { + QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize()) + { ExecuteOrDefer(); } else { ReplySuccess(); @@ -1203,32 +1193,6 @@ public: } } - // TODO: Remove? Is it actual for NewEngine? - void FillQueryProfile(const NKqpProto::TKqpStatsQuery& stats, NKikimrKqp::TQueryResponse& response) { - auto& kqlProfile = *response.MutableProfile()->AddKqlProfiles(); - for (auto& execStats : stats.GetExecutions()) { - auto& txStats = *kqlProfile.AddMkqlProfiles()->MutableTxStats(); - - txStats.SetDurationUs(execStats.GetDurationUs()); - for (auto& tableStats : execStats.GetTables()) { - auto& txTableStats = *txStats.AddTableAccessStats(); - - txTableStats.MutableTableInfo()->SetName(tableStats.GetTablePath()); - if (tableStats.GetReadRows() > 0) { - txTableStats.MutableSelectRange()->SetRows(tableStats.GetReadRows()); - txTableStats.MutableSelectRange()->SetBytes(tableStats.GetReadBytes()); - } - if (tableStats.GetWriteRows() > 0) { - txTableStats.MutableUpdateRow()->SetCount(tableStats.GetWriteRows()); - txTableStats.MutableUpdateRow()->SetBytes(tableStats.GetWriteBytes()); - } - if (tableStats.GetEraseRows() > 0) { - txTableStats.MutableEraseRow()->SetCount(tableStats.GetEraseRows()); - } - } - } - } - void FillStats(NKikimrKqp::TEvQueryResponse* record) { auto *response = record->MutableResponse(); auto* stats = &QueryState->Stats; @@ -1263,7 +1227,6 @@ public: bool reportStats = (GetStatsModeInt(queryRequest) != EKikimrStatsMode::None); if (reportStats) { - FillQueryProfile(*stats, *response); response->SetQueryPlan(SerializeAnalyzePlan(*stats)); response->MutableQueryStats()->Swap(stats); diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 27e75b5fc90..4840e8e2643 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -687,6 +687,7 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) { if (streamPart.HasResultSet()) { auto resultSet = streamPart.ExtractResultSet(); PrintResultSet(resultSet, resultSetWriter); + res.RowsCount += resultSet.RowsCount(); } if constexpr (std::is_same_v<TIterator, NYdb::NTable::TScanQueryPartIterator>) { @@ -822,6 +823,33 @@ NJson::TJsonValue FindPlanNodeByKv(const NJson::TJsonValue& plan, const TString& return NJson::TJsonValue(); } +void FindPlanNodesImpl(const NJson::TJsonValue& node, const TString& key, std::vector<NJson::TJsonValue>& results) { + if (node.IsArray()) { + for (const auto& item: node.GetArray()) { + FindPlanNodesImpl(item, key, results); + } + } + + if (!node.IsMap()) { + return; + } + + auto map = node.GetMap(); + if (map.contains(key)) { + results.push_back(map.at(key)); + } + + for (const auto& [_, value]: map) { + FindPlanNodesImpl(value, key, results); + } +} + +std::vector<NJson::TJsonValue> FindPlanNodes(const NJson::TJsonValue& plan, const TString& key) { + std::vector<NJson::TJsonValue> results; + FindPlanNodesImpl(plan, key, results); + return results; +} + void CreateSampleTablesWithIndex(TSession& session) { auto res = session.ExecuteSchemeQuery(R"( --!syntax_v1 diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index c1ea5995d9e..07d94ee25a1 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -173,6 +173,7 @@ struct TCollectedStreamResult { TString ResultSetYson; TMaybe<TString> PlanJson; TMaybe<Ydb::TableStats::QueryStats> QueryStats; + ui64 RowsCount = 0; }; TCollectedStreamResult CollectStreamResult(NYdb::NExperimental::TStreamPartIterator& it); @@ -243,6 +244,7 @@ TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it); ui32 CountPlanNodesByKv(const NJson::TJsonValue& plan, const TString& key, const TString& value); NJson::TJsonValue FindPlanNodeByKv(const NJson::TJsonValue& plan, const TString& key, const TString& value); +std::vector<NJson::TJsonValue> FindPlanNodes(const NJson::TJsonValue& plan, const TString& key); TString ReadTableToYson(NYdb::NTable::TSession session, const TString& table); TString ReadTablePartToYson(NYdb::NTable::TSession session, const TString& table); diff --git a/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp b/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp index 2aceabbd3f9..e9d6133b51f 100644 --- a/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp @@ -49,8 +49,6 @@ void CreateSampleTables(TKikimrRunner& kikimr) { Y_UNIT_TEST_SUITE(KqpFlowControl) { -#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 9u - void DoFlowControlTest(ui64 limit, bool hasBlockedByCapacity, bool useSessionActor) { NKikimrConfig::TAppConfig appCfg; appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetChannelBufferSize(limit); @@ -59,11 +57,9 @@ void DoFlowControlTest(ui64 limit, bool hasBlockedByCapacity, bool useSessionAct appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlHeavyProgramMemoryLimit(200ul << 20); appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetQueryMemoryLimit(20ul << 30); - //TKikimrRunner kikimr{appCfg, "", KikimrDefaultUtDomainRoot}; auto kikimr = KikimrRunnerEnableSessionActor(useSessionActor, {}, appCfg); - CreateSampleTables(kikimr); - NExperimental::TStreamQueryClient db(kikimr.GetDriver()); + auto db = kikimr.GetTableClient(); Y_DEFER { NYql::NDq::GetDqExecutionSettingsForTests().Reset(); @@ -72,10 +68,10 @@ void DoFlowControlTest(ui64 limit, bool hasBlockedByCapacity, bool useSessionAct NYql::NDq::GetDqExecutionSettingsForTests().FlowControl.MaxOutputChunkSize = limit; NYql::NDq::GetDqExecutionSettingsForTests().FlowControl.InFlightBytesOvercommit = 1.0f; - auto settings = NExperimental::TExecuteStreamQuerySettings() - .ProfileMode(NExperimental::EStreamQueryProfileMode::Profile); + TStreamExecScanQuerySettings settings; + settings.CollectQueryStats(ECollectQueryStatsMode::Profile); - auto result = db.ExecuteStreamQuery(R"( + auto it = db.StreamExecuteScanQuery(R"( $r = (select * from `/Root/FourShard` where Key > 201); SELECT l.Key as key, l.Text as text, r.Value1 as value @@ -83,33 +79,26 @@ void DoFlowControlTest(ui64 limit, bool hasBlockedByCapacity, bool useSessionAct ORDER BY key, text, value )", settings).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + auto res = CollectStreamResult(it); - TVector<TString> profiles; CompareYson(R"([ [[202u];["Value2"];["Value-202"]]; [[301u];["Value1"];["Value-301"]]; [[302u];["Value2"];["Value-302"]] - ])", StreamResultToYson(result, &profiles)); - - UNIT_ASSERT_EQUAL(1, profiles.size()); - - NYql::NDqProto::TDqExecutionStats stats; - google::protobuf::TextFormat::ParseFromString(profiles[0], &stats); - UNIT_ASSERT(stats.IsInitialized()); - - ui32 blockedByCapacity = 0; - for (const auto& stage : stats.GetStages()) { - for (const auto& ca : stage.GetComputeActors()) { - for (const auto& task : ca.GetTasks()) { - for (const auto& output : task.GetOutputChannels()) { - blockedByCapacity += output.GetBlockedByCapacity(); - } - } - } + ])", res.ResultSetYson); + + NJson::TJsonValue plan; + NJson::ReadJsonTree(*res.PlanJson, &plan, true); + + ui32 writesBlockedNoSpace = 0; + auto nodes = FindPlanNodes(plan, "WritesBlockedNoSpace"); + for (auto& node : nodes) { + writesBlockedNoSpace += node.GetIntegerSafe(); } - UNIT_ASSERT_EQUAL(hasBlockedByCapacity, blockedByCapacity > 0); + UNIT_ASSERT_EQUAL(hasBlockedByCapacity, writesBlockedNoSpace > 0); } Y_UNIT_TEST_TWIN(FlowControl_Unlimited, UseSessionActor) { @@ -184,8 +173,6 @@ void SlowClient() { UNIT_ASSERT_EQUAL(kqpCounters.RmMemory->Val(), 0); } -#endif - } // suite } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp index cf45985c599..2087a2b1e55 100644 --- a/ydb/core/kqp/ut/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp @@ -1130,34 +1130,26 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto cfg = AppCfg(); cfg.MutableFeatureFlags()->SetEnablePredicateExtractForScanQueries(WithPredicatesExtract); auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor, {}, cfg); - NExperimental::TStreamQueryClient db(kikimr.GetDriver()); + auto db = kikimr.GetTableClient(); - auto settings = NExperimental::TExecuteStreamQuerySettings() - .ProfileMode(NExperimental::EStreamQueryProfileMode::Basic); + TStreamExecScanQuerySettings settings; + settings.CollectQueryStats(ECollectQueryStatsMode::Basic); // simple key { - auto it = db.ExecuteStreamQuery(Sprintf(R"( + auto it = db.StreamExecuteScanQuery(Sprintf(R"( PRAGMA Kikimr.OptEnablePredicateExtract = '%s'; SELECT * FROM `/Root/EightShard` WHERE Key = 301; )", WithPredicatesExtract ? "true" : "false"), settings).GetValueSync(); UNIT_ASSERT(it.IsSuccess()); - TVector<TString> profiles; - CompareYson(R"([[[3];[301u];["Value1"]]])", StreamResultToYson(it, &profiles)); - - UNIT_ASSERT_EQUAL(1, profiles.size()); + auto res = CollectStreamResult(it); + CompareYson(R"([[[3];[301u];["Value1"]]])", res.ResultSetYson); - { - NYql::NDqProto::TDqExecutionStats stats; - google::protobuf::TextFormat::ParseFromString(profiles[0], &stats); - UNIT_ASSERT(stats.IsInitialized()); - - NKqpProto::TKqpExecutionExtraStats extraStats; - UNIT_ASSERT(stats.GetExtra().UnpackTo(&extraStats)); - UNIT_ASSERT_VALUES_EQUAL_C(extraStats.GetAffectedShards(), 1, "" << stats.DebugString()); - } + UNIT_ASSERT(res.QueryStats); + UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).affected_shards(), 1); } // complex key @@ -1166,7 +1158,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { .AddParam("$ts").Int64(2).Build() .Build(); - auto it = db.ExecuteStreamQuery(Sprintf(R"( + auto it = db.StreamExecuteScanQuery(Sprintf(R"( PRAGMA Kikimr.OptEnablePredicateExtract = '%s'; DECLARE $ts AS Int64; SELECT * FROM `/Root/Logs` WHERE App = "nginx" AND Ts > $ts @@ -1174,65 +1166,42 @@ Y_UNIT_TEST_SUITE(KqpScan) { UNIT_ASSERT(it.IsSuccess()); - TVector<TString> profiles; - CompareYson(R"([[["nginx"];["nginx-23"];["GET /cat.jpg HTTP/1.1"];[3]]])", StreamResultToYson(it, &profiles)); - - UNIT_ASSERT_EQUAL(1, profiles.size()); - - { - NYql::NDqProto::TDqExecutionStats stats; - google::protobuf::TextFormat::ParseFromString(profiles[0], &stats); - UNIT_ASSERT(stats.IsInitialized()); + auto res = CollectStreamResult(it); + CompareYson(R"([[["nginx"];["nginx-23"];["GET /cat.jpg HTTP/1.1"];[3]]])", res.ResultSetYson); - NKqpProto::TKqpExecutionExtraStats extraStats; - UNIT_ASSERT(stats.GetExtra().UnpackTo(&extraStats)); - UNIT_ASSERT_VALUES_EQUAL_C(extraStats.GetAffectedShards(), 1, "" << stats.DebugString()); - } + UNIT_ASSERT(res.QueryStats); + UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).affected_shards(), 1); } } Y_UNIT_TEST_TWIN(PrunePartitionsByExpr, UseSessionActor) { auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor, {}, AppCfg()); - NExperimental::TStreamQueryClient db(kikimr.GetDriver()); + auto db = kikimr.GetTableClient(); - auto settings = NExperimental::TExecuteStreamQuerySettings() - .ProfileMode(NExperimental::EStreamQueryProfileMode::Basic); + TStreamExecScanQuerySettings settings; + settings.CollectQueryStats(ECollectQueryStatsMode::Basic); auto params = TParamsBuilder() .AddParam("$key").Uint64(300).Build() .Build(); - auto it = db.ExecuteStreamQuery(R"( + auto it = db.StreamExecuteScanQuery(R"( DECLARE $key AS Uint64; SELECT * FROM `/Root/EightShard` WHERE Key = $key + 1; )", params, settings).GetValueSync(); UNIT_ASSERT(it.IsSuccess()); - TVector<TString> profiles; - CompareYson(R"([[[3];[301u];["Value1"]]])", StreamResultToYson(it, &profiles)); - - UNIT_ASSERT_EQUAL(2, profiles.size()); - - { - NYql::NDqProto::TDqExecutionStats stats; - google::protobuf::TextFormat::ParseFromString(profiles[0], &stats); - UNIT_ASSERT(stats.IsInitialized()); - - NKqpProto::TKqpExecutionExtraStats extraStats; - UNIT_ASSERT(stats.GetExtra().UnpackTo(&extraStats)); - UNIT_ASSERT_VALUES_EQUAL(extraStats.GetAffectedShards(), 0); - } - - { - NYql::NDqProto::TDqExecutionStats stats; - google::protobuf::TextFormat::ParseFromString(profiles[1], &stats); - UNIT_ASSERT(stats.IsInitialized()); + auto res = CollectStreamResult(it); + CompareYson(R"([ + [[3];[301u];["Value1"]] + ])", res.ResultSetYson); - NKqpProto::TKqpExecutionExtraStats extraStats; - UNIT_ASSERT(stats.GetExtra().UnpackTo(&extraStats)); - UNIT_ASSERT_VALUES_EQUAL(extraStats.GetAffectedShards(), 1); - } + UNIT_ASSERT(res.QueryStats); + UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).affected_shards(), 0); + UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(1).affected_shards(), 1); } Y_UNIT_TEST_TWIN(TooManyComputeActors, UseSessionActor) { diff --git a/ydb/core/kqp/ut/kqp_stats_ut.cpp b/ydb/core/kqp/ut/kqp_stats_ut.cpp index 8a7406754ce..6202b0ba114 100644 --- a/ydb/core/kqp/ut/kqp_stats_ut.cpp +++ b/ydb/core/kqp/ut/kqp_stats_ut.cpp @@ -16,17 +16,21 @@ Y_UNIT_TEST_SUITE(KqpStats) { Y_UNIT_TEST_TWIN(MultiTxStatsFullExp, UseSessionActor) { auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor); - NExperimental::TStreamQueryClient db{kikimr.GetDriver()}; - auto settings = NExperimental::TExecuteStreamQuerySettings(); - settings.ProfileMode(NYdb::NExperimental::EStreamQueryProfileMode::Full); + auto db = kikimr.GetTableClient(); + + TStreamExecScanQuerySettings settings; + settings.CollectQueryStats(ECollectQueryStatsMode::Profile); - auto it = db.ExecuteStreamQuery(R"( + auto it = db.StreamExecuteScanQuery(R"( SELECT * FROM `/Root/EightShard` WHERE Key BETWEEN 150 AND 266 ORDER BY Data LIMIT 4; )", settings).GetValueSync(); auto res = CollectStreamResult(it); - UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); - UNIT_ASSERT_VALUES_EQUAL(res.ResultSetYson, R"([[[1];[202u];["Value2"]];[[2];[201u];["Value1"]];[[3];[203u];["Value3"]]])"); + CompareYson(R"([ + [[1];[202u];["Value2"]]; + [[2];[201u];["Value1"]]; + [[3];[203u];["Value3"]] + ])", res.ResultSetYson); UNIT_ASSERT(res.PlanJson); NJson::TJsonValue plan; diff --git a/ydb/core/kqp/ut/kqp_sys_view_ut.cpp b/ydb/core/kqp/ut/kqp_sys_view_ut.cpp index 23a0b6ca2a5..3ba730c872b 100644 --- a/ydb/core/kqp/ut/kqp_sys_view_ut.cpp +++ b/ydb/core/kqp/ut/kqp_sys_view_ut.cpp @@ -380,6 +380,52 @@ Y_UNIT_TEST_SUITE(KqpSystemView) { checkTable("`/Root/.sys/top_queries_by_cpu_time_one_hour`"); } + Y_UNIT_TEST_TWIN(QueryStatsScan, UseSessionActor) { + auto checkTable = [&] (const TStringBuf tableName) { + auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor); + auto client = kikimr.GetTableClient(); + + { + auto it = kikimr.GetTableClient().StreamExecuteScanQuery(R"( + SELECT COUNT(*) FROM `/Root/EightShard` + )").GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + CompareYson(R"([[24u]])", StreamResultToYson(it)); + } + + TStringStream request; + request << "SELECT ReadBytes FROM " << tableName << " ORDER BY ReadBytes"; + + auto it = client.StreamExecuteScanQuery(request.Str()).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + TSet<ui64> readBytesSet; + for (;;) { + auto streamPart = it.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + break; + } + + if (streamPart.HasResultSet()) { + auto resultSet = streamPart.ExtractResultSet(); + + NYdb::TResultSetParser parser(resultSet); + while (parser.TryNextRow()) { + auto value = parser.ColumnParser("ReadBytes").GetOptionalUint64(); + UNIT_ASSERT(value); + readBytesSet.emplace(*value); + } + } + } + + UNIT_ASSERT(readBytesSet.contains(192)); // EightShard + }; + + checkTable("`/Root/.sys/top_queries_by_read_bytes_one_minute`"); + } + Y_UNIT_TEST(FailNavigate) { TKikimrRunner kikimr("user0@builtin"); auto client = kikimr.GetTableClient(); diff --git a/ydb/core/kqp/ut/kqp_table_predicate_ut.cpp b/ydb/core/kqp/ut/kqp_table_predicate_ut.cpp index e5dfdd15767..1550438a757 100644 --- a/ydb/core/kqp/ut/kqp_table_predicate_ut.cpp +++ b/ydb/core/kqp/ut/kqp_table_predicate_ut.cpp @@ -181,32 +181,29 @@ void CreateTableWithIntKey(TSession session, ui64 partitions, ui32 rangesPerPart UNIT_ASSERT(success); } -void ExecuteStreamQueryAndCheck(NExperimental::TStreamQueryClient& db, const TString& query, +void ExecuteStreamQueryAndCheck(NYdb::NTable::TTableClient& db, const TString& query, const TString& expectedYson) { - auto settings = NExperimental::TExecuteStreamQuerySettings() - .ProfileMode(NExperimental::EStreamQueryProfileMode::Basic); + TStreamExecScanQuerySettings settings; + settings.CollectQueryStats(ECollectQueryStatsMode::Basic); - auto it = db.ExecuteStreamQuery(query, settings).GetValueSync(); + auto it = db.StreamExecuteScanQuery(query, settings).GetValueSync(); UNIT_ASSERT(it.IsSuccess()); - TVector<TString> profiles; - auto resultYson = StreamResultToYson(it, &profiles); + auto res = CollectStreamResult(it); Cerr << "---------QUERY----------" << Endl; Cerr << query << Endl; Cerr << "---------RESULT---------" << Endl; - Cerr << resultYson << Endl; + Cerr << res.ResultSetYson << Endl; Cerr << "------------------------" << Endl; - CompareYson(expectedYson, resultYson); + CompareYson(expectedYson, res.ResultSetYson); - NYql::NDqProto::TDqExecutionStats stats; - // First stage is computation, second scan read. - google::protobuf::TextFormat::ParseFromString(profiles.back(), &stats); + UNIT_ASSERT(res.QueryStats); + ui64 readRows = res.QueryStats->query_phases().rbegin()->table_access(0).reads().rows(); + ui64 resultRows = res.RowsCount; - ui64 resultRows = stats.GetResultRows(); - ui64 readRows = stats.GetTables(0).GetReadRows(); UNIT_ASSERT_EQUAL_C(resultRows, readRows, "There are " << resultRows << " in result, but read " << readRows << " !"); } @@ -214,10 +211,8 @@ void RunTestOverIntTable(const TString& query, const TString& expectedYson, ui64 TKikimrSettings kikimrSettings; TKikimrRunner kikimr(kikimrSettings); - NExperimental::TStreamQueryClient db(kikimr.GetDriver()); - - auto client = kikimr.GetTableClient(); - auto session = client.CreateSession().GetValueSync().GetSession(); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); CreateTableWithIntKey(session, partitions, rangesPerPartition); ExecuteStreamQueryAndCheck(db, query, expectedYson); @@ -1220,7 +1215,6 @@ Y_UNIT_TEST_SUITE(KqpTablePredicate) { Y_UNIT_TEST(NoFullScanAtDNFPredicate) { TKikimrRunner kikimr; - NExperimental::TStreamQueryClient streamDb(kikimr.GetDriver()); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1271,7 +1265,7 @@ Y_UNIT_TEST_SUITE(KqpTablePredicate) { ORDER BY Value; )"); SubstGlobal(query, "<PREDICATE>", data.first); - ExecuteStreamQueryAndCheck(streamDb, query, data.second); + ExecuteStreamQueryAndCheck(db, query, data.second); } } |