summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergei Puchin <[email protected]>2022-06-19 00:13:19 +0300
committerSergei Puchin <[email protected]>2022-06-19 00:13:19 +0300
commit8e2e903d222a17b6ba3da50f43d0e6bf49f8d583 (patch)
treefdeaea62fb7f4b02ddb0c23e628fe23f68f07847
parentd28ffff79b7e2c859289d62a313c978a6d009f5d (diff)
Fix query stats reporting for ScanQueries. (KIKIMR-15013)
ref:351908dc7be3e8df9e53d043f79f8638c509fae9
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp4
-rw-r--r--ydb/core/kqp/host/kqp_run_scan.cpp5
-rw-r--r--ydb/core/kqp/host/kqp_runner.cpp13
-rw-r--r--ydb/core/kqp/kqp_ic_gateway.cpp8
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp47
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp28
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h2
-rw-r--r--ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp47
-rw-r--r--ydb/core/kqp/ut/kqp_scan_ut.cpp85
-rw-r--r--ydb/core/kqp/ut/kqp_stats_ut.cpp16
-rw-r--r--ydb/core/kqp/ut/kqp_sys_view_ut.cpp46
-rw-r--r--ydb/core/kqp/ut/kqp_table_predicate_ut.cpp32
12 files changed, 161 insertions, 172 deletions
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 97b73df73a5..ee6183d30a6 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -283,10 +283,6 @@ public:
queryResult.PreparedQuery = Query;
}
- /*
- * Set stats and plan for DataQuery. In case of ScanQuery they will be set
- * later in TStreamExecuteScanQueryRPC.
- */
if (ExecuteCtx->QueryResults.size() == 1) {
auto& execResult = ExecuteCtx->QueryResults[0];
queryResult.QueryStats.Swap(&execResult.QueryStats);
diff --git a/ydb/core/kqp/host/kqp_run_scan.cpp b/ydb/core/kqp/host/kqp_run_scan.cpp
index caad430c281..38a67850eb2 100644
--- a/ydb/core/kqp/host/kqp_run_scan.cpp
+++ b/ydb/core/kqp/host/kqp_run_scan.cpp
@@ -57,10 +57,13 @@ protected:
}
bool OnExecuterResult(NKikimrKqp::TExecuterTxResult&& execResult, TExprContext& ctx, bool commit) override {
- Y_UNUSED(execResult);
Y_UNUSED(ctx);
Y_UNUSED(commit);
+ if (execResult.HasStats()) {
+ TransformCtx->QueryStats.AddExecutions()->Swap(execResult.MutableStats());
+ }
+
return true;
}
};
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp
index 64ad34e606b..0f6abe1d13b 100644
--- a/ydb/core/kqp/host/kqp_runner.cpp
+++ b/ydb/core/kqp/host/kqp_runner.cpp
@@ -97,12 +97,17 @@ class TScanAsyncRunResult : public TKqpAsyncResultBase<IKikimrQueryExecutor::TQu
public:
using TResult = IKikimrQueryExecutor::TQueryResult;
- TScanAsyncRunResult(const TExprNode::TPtr& queryRoot, TExprContext& exprCtx, IGraphTransformer& transformer)
- : TKqpAsyncResultBase(queryRoot, exprCtx, transformer) {}
+ TScanAsyncRunResult(const TExprNode::TPtr& queryRoot, TExprContext& exprCtx, IGraphTransformer& transformer,
+ const TKqlTransformContext& transformCtx)
+ : TKqpAsyncResultBase(queryRoot, exprCtx, transformer)
+ , TransformCtx(transformCtx) {}
void FillResult(TResult& queryResult) const override {
- Y_UNUSED(queryResult);
+ queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats);
}
+
+private:
+ const TKqlTransformContext& TransformCtx;
};
class TKqpIterationGuardTransformer : public TSyncTransformerBase {
@@ -609,7 +614,7 @@ private:
Y_ASSERT(!TxState->Tx().GetSnapshot().IsValid());
- return MakeIntrusive<TScanAsyncRunResult>(world, ctx, *ScanRunQueryTransformer);
+ return MakeIntrusive<TScanAsyncRunResult>(world, ctx, *ScanRunQueryTransformer, *TransformCtx);
}
static bool MergeFlagValue(const TMaybe<bool>& configFlag, const TMaybe<bool>& flag) {
diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp
index f99e1333b39..2e038d4bbf3 100644
--- a/ydb/core/kqp/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/kqp_ic_gateway.cpp
@@ -882,14 +882,6 @@ private:
result.ExecuterResult.Swap(response->MutableResult());
- if (Target && result.ExecuterResult.HasStats()) {
- auto statsEv = MakeHolder<TEvKqpExecuter::TEvStreamProfile>();
- auto& record = statsEv->Record;
-
- record.MutableProfile()->Swap(result.ExecuterResult.MutableStats());
- this->Send(Target, statsEv.Release());
- }
-
Promise.SetValue(std::move(result));
this->PassAway();
}
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 4e59a6ae6f0..81519273875 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -1070,24 +1070,14 @@ public:
return;
}
- bool scan = QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN;
- if (scan) {
- if (QueryState->RequestActorId && txResult.HasStats()) {
- auto statsEv = MakeHolder<TEvKqpExecuter::TEvStreamProfile>();
- auto& record = statsEv->Record;
-
- record.MutableProfile()->Swap(txResult.MutableStats());
- Send(QueryState->RequestActorId, statsEv.Release());
- }
- } else {
- if (txResult.HasStats()) {
- auto* exec = QueryState->Stats.AddExecutions();
- exec->Swap(txResult.MutableStats());
- }
+ if (txResult.HasStats()) {
+ auto* exec = QueryState->Stats.AddExecutions();
+ exec->Swap(txResult.MutableStats());
}
if (QueryState->PreparedQuery &&
- QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize()) {
+ QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize())
+ {
ExecuteOrDefer();
} else {
ReplySuccess();
@@ -1203,32 +1193,6 @@ public:
}
}
- // TODO: Remove? Is it actual for NewEngine?
- void FillQueryProfile(const NKqpProto::TKqpStatsQuery& stats, NKikimrKqp::TQueryResponse& response) {
- auto& kqlProfile = *response.MutableProfile()->AddKqlProfiles();
- for (auto& execStats : stats.GetExecutions()) {
- auto& txStats = *kqlProfile.AddMkqlProfiles()->MutableTxStats();
-
- txStats.SetDurationUs(execStats.GetDurationUs());
- for (auto& tableStats : execStats.GetTables()) {
- auto& txTableStats = *txStats.AddTableAccessStats();
-
- txTableStats.MutableTableInfo()->SetName(tableStats.GetTablePath());
- if (tableStats.GetReadRows() > 0) {
- txTableStats.MutableSelectRange()->SetRows(tableStats.GetReadRows());
- txTableStats.MutableSelectRange()->SetBytes(tableStats.GetReadBytes());
- }
- if (tableStats.GetWriteRows() > 0) {
- txTableStats.MutableUpdateRow()->SetCount(tableStats.GetWriteRows());
- txTableStats.MutableUpdateRow()->SetBytes(tableStats.GetWriteBytes());
- }
- if (tableStats.GetEraseRows() > 0) {
- txTableStats.MutableEraseRow()->SetCount(tableStats.GetEraseRows());
- }
- }
- }
- }
-
void FillStats(NKikimrKqp::TEvQueryResponse* record) {
auto *response = record->MutableResponse();
auto* stats = &QueryState->Stats;
@@ -1263,7 +1227,6 @@ public:
bool reportStats = (GetStatsModeInt(queryRequest) != EKikimrStatsMode::None);
if (reportStats) {
- FillQueryProfile(*stats, *response);
response->SetQueryPlan(SerializeAnalyzePlan(*stats));
response->MutableQueryStats()->Swap(stats);
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 27e75b5fc90..4840e8e2643 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -687,6 +687,7 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {
if (streamPart.HasResultSet()) {
auto resultSet = streamPart.ExtractResultSet();
PrintResultSet(resultSet, resultSetWriter);
+ res.RowsCount += resultSet.RowsCount();
}
if constexpr (std::is_same_v<TIterator, NYdb::NTable::TScanQueryPartIterator>) {
@@ -822,6 +823,33 @@ NJson::TJsonValue FindPlanNodeByKv(const NJson::TJsonValue& plan, const TString&
return NJson::TJsonValue();
}
+void FindPlanNodesImpl(const NJson::TJsonValue& node, const TString& key, std::vector<NJson::TJsonValue>& results) {
+ if (node.IsArray()) {
+ for (const auto& item: node.GetArray()) {
+ FindPlanNodesImpl(item, key, results);
+ }
+ }
+
+ if (!node.IsMap()) {
+ return;
+ }
+
+ auto map = node.GetMap();
+ if (map.contains(key)) {
+ results.push_back(map.at(key));
+ }
+
+ for (const auto& [_, value]: map) {
+ FindPlanNodesImpl(value, key, results);
+ }
+}
+
+std::vector<NJson::TJsonValue> FindPlanNodes(const NJson::TJsonValue& plan, const TString& key) {
+ std::vector<NJson::TJsonValue> results;
+ FindPlanNodesImpl(plan, key, results);
+ return results;
+}
+
void CreateSampleTablesWithIndex(TSession& session) {
auto res = session.ExecuteSchemeQuery(R"(
--!syntax_v1
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index c1ea5995d9e..07d94ee25a1 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -173,6 +173,7 @@ struct TCollectedStreamResult {
TString ResultSetYson;
TMaybe<TString> PlanJson;
TMaybe<Ydb::TableStats::QueryStats> QueryStats;
+ ui64 RowsCount = 0;
};
TCollectedStreamResult CollectStreamResult(NYdb::NExperimental::TStreamPartIterator& it);
@@ -243,6 +244,7 @@ TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it);
ui32 CountPlanNodesByKv(const NJson::TJsonValue& plan, const TString& key, const TString& value);
NJson::TJsonValue FindPlanNodeByKv(const NJson::TJsonValue& plan, const TString& key, const TString& value);
+std::vector<NJson::TJsonValue> FindPlanNodes(const NJson::TJsonValue& plan, const TString& key);
TString ReadTableToYson(NYdb::NTable::TSession session, const TString& table);
TString ReadTablePartToYson(NYdb::NTable::TSession session, const TString& table);
diff --git a/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp b/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp
index 2aceabbd3f9..e9d6133b51f 100644
--- a/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_ne_flowcontrol_ut.cpp
@@ -49,8 +49,6 @@ void CreateSampleTables(TKikimrRunner& kikimr) {
Y_UNIT_TEST_SUITE(KqpFlowControl) {
-#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 9u
-
void DoFlowControlTest(ui64 limit, bool hasBlockedByCapacity, bool useSessionActor) {
NKikimrConfig::TAppConfig appCfg;
appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetChannelBufferSize(limit);
@@ -59,11 +57,9 @@ void DoFlowControlTest(ui64 limit, bool hasBlockedByCapacity, bool useSessionAct
appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlHeavyProgramMemoryLimit(200ul << 20);
appCfg.MutableTableServiceConfig()->MutableResourceManager()->SetQueryMemoryLimit(20ul << 30);
- //TKikimrRunner kikimr{appCfg, "", KikimrDefaultUtDomainRoot};
auto kikimr = KikimrRunnerEnableSessionActor(useSessionActor, {}, appCfg);
-
CreateSampleTables(kikimr);
- NExperimental::TStreamQueryClient db(kikimr.GetDriver());
+ auto db = kikimr.GetTableClient();
Y_DEFER {
NYql::NDq::GetDqExecutionSettingsForTests().Reset();
@@ -72,10 +68,10 @@ void DoFlowControlTest(ui64 limit, bool hasBlockedByCapacity, bool useSessionAct
NYql::NDq::GetDqExecutionSettingsForTests().FlowControl.MaxOutputChunkSize = limit;
NYql::NDq::GetDqExecutionSettingsForTests().FlowControl.InFlightBytesOvercommit = 1.0f;
- auto settings = NExperimental::TExecuteStreamQuerySettings()
- .ProfileMode(NExperimental::EStreamQueryProfileMode::Profile);
+ TStreamExecScanQuerySettings settings;
+ settings.CollectQueryStats(ECollectQueryStatsMode::Profile);
- auto result = db.ExecuteStreamQuery(R"(
+ auto it = db.StreamExecuteScanQuery(R"(
$r = (select * from `/Root/FourShard` where Key > 201);
SELECT l.Key as key, l.Text as text, r.Value1 as value
@@ -83,33 +79,26 @@ void DoFlowControlTest(ui64 limit, bool hasBlockedByCapacity, bool useSessionAct
ORDER BY key, text, value
)", settings).GetValueSync();
- UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ auto res = CollectStreamResult(it);
- TVector<TString> profiles;
CompareYson(R"([
[[202u];["Value2"];["Value-202"]];
[[301u];["Value1"];["Value-301"]];
[[302u];["Value2"];["Value-302"]]
- ])", StreamResultToYson(result, &profiles));
-
- UNIT_ASSERT_EQUAL(1, profiles.size());
-
- NYql::NDqProto::TDqExecutionStats stats;
- google::protobuf::TextFormat::ParseFromString(profiles[0], &stats);
- UNIT_ASSERT(stats.IsInitialized());
-
- ui32 blockedByCapacity = 0;
- for (const auto& stage : stats.GetStages()) {
- for (const auto& ca : stage.GetComputeActors()) {
- for (const auto& task : ca.GetTasks()) {
- for (const auto& output : task.GetOutputChannels()) {
- blockedByCapacity += output.GetBlockedByCapacity();
- }
- }
- }
+ ])", res.ResultSetYson);
+
+ NJson::TJsonValue plan;
+ NJson::ReadJsonTree(*res.PlanJson, &plan, true);
+
+ ui32 writesBlockedNoSpace = 0;
+ auto nodes = FindPlanNodes(plan, "WritesBlockedNoSpace");
+ for (auto& node : nodes) {
+ writesBlockedNoSpace += node.GetIntegerSafe();
}
- UNIT_ASSERT_EQUAL(hasBlockedByCapacity, blockedByCapacity > 0);
+ UNIT_ASSERT_EQUAL(hasBlockedByCapacity, writesBlockedNoSpace > 0);
}
Y_UNIT_TEST_TWIN(FlowControl_Unlimited, UseSessionActor) {
@@ -184,8 +173,6 @@ void SlowClient() {
UNIT_ASSERT_EQUAL(kqpCounters.RmMemory->Val(), 0);
}
-#endif
-
} // suite
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp
index cf45985c599..2087a2b1e55 100644
--- a/ydb/core/kqp/ut/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp
@@ -1130,34 +1130,26 @@ Y_UNIT_TEST_SUITE(KqpScan) {
auto cfg = AppCfg();
cfg.MutableFeatureFlags()->SetEnablePredicateExtractForScanQueries(WithPredicatesExtract);
auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor, {}, cfg);
- NExperimental::TStreamQueryClient db(kikimr.GetDriver());
+ auto db = kikimr.GetTableClient();
- auto settings = NExperimental::TExecuteStreamQuerySettings()
- .ProfileMode(NExperimental::EStreamQueryProfileMode::Basic);
+ TStreamExecScanQuerySettings settings;
+ settings.CollectQueryStats(ECollectQueryStatsMode::Basic);
// simple key
{
- auto it = db.ExecuteStreamQuery(Sprintf(R"(
+ auto it = db.StreamExecuteScanQuery(Sprintf(R"(
PRAGMA Kikimr.OptEnablePredicateExtract = '%s';
SELECT * FROM `/Root/EightShard` WHERE Key = 301;
)", WithPredicatesExtract ? "true" : "false"), settings).GetValueSync();
UNIT_ASSERT(it.IsSuccess());
- TVector<TString> profiles;
- CompareYson(R"([[[3];[301u];["Value1"]]])", StreamResultToYson(it, &profiles));
-
- UNIT_ASSERT_EQUAL(1, profiles.size());
+ auto res = CollectStreamResult(it);
+ CompareYson(R"([[[3];[301u];["Value1"]]])", res.ResultSetYson);
- {
- NYql::NDqProto::TDqExecutionStats stats;
- google::protobuf::TextFormat::ParseFromString(profiles[0], &stats);
- UNIT_ASSERT(stats.IsInitialized());
-
- NKqpProto::TKqpExecutionExtraStats extraStats;
- UNIT_ASSERT(stats.GetExtra().UnpackTo(&extraStats));
- UNIT_ASSERT_VALUES_EQUAL_C(extraStats.GetAffectedShards(), 1, "" << stats.DebugString());
- }
+ UNIT_ASSERT(res.QueryStats);
+ UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).affected_shards(), 1);
}
// complex key
@@ -1166,7 +1158,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
.AddParam("$ts").Int64(2).Build()
.Build();
- auto it = db.ExecuteStreamQuery(Sprintf(R"(
+ auto it = db.StreamExecuteScanQuery(Sprintf(R"(
PRAGMA Kikimr.OptEnablePredicateExtract = '%s';
DECLARE $ts AS Int64;
SELECT * FROM `/Root/Logs` WHERE App = "nginx" AND Ts > $ts
@@ -1174,65 +1166,42 @@ Y_UNIT_TEST_SUITE(KqpScan) {
UNIT_ASSERT(it.IsSuccess());
- TVector<TString> profiles;
- CompareYson(R"([[["nginx"];["nginx-23"];["GET /cat.jpg HTTP/1.1"];[3]]])", StreamResultToYson(it, &profiles));
-
- UNIT_ASSERT_EQUAL(1, profiles.size());
-
- {
- NYql::NDqProto::TDqExecutionStats stats;
- google::protobuf::TextFormat::ParseFromString(profiles[0], &stats);
- UNIT_ASSERT(stats.IsInitialized());
+ auto res = CollectStreamResult(it);
+ CompareYson(R"([[["nginx"];["nginx-23"];["GET /cat.jpg HTTP/1.1"];[3]]])", res.ResultSetYson);
- NKqpProto::TKqpExecutionExtraStats extraStats;
- UNIT_ASSERT(stats.GetExtra().UnpackTo(&extraStats));
- UNIT_ASSERT_VALUES_EQUAL_C(extraStats.GetAffectedShards(), 1, "" << stats.DebugString());
- }
+ UNIT_ASSERT(res.QueryStats);
+ UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).affected_shards(), 1);
}
}
Y_UNIT_TEST_TWIN(PrunePartitionsByExpr, UseSessionActor) {
auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor, {}, AppCfg());
- NExperimental::TStreamQueryClient db(kikimr.GetDriver());
+ auto db = kikimr.GetTableClient();
- auto settings = NExperimental::TExecuteStreamQuerySettings()
- .ProfileMode(NExperimental::EStreamQueryProfileMode::Basic);
+ TStreamExecScanQuerySettings settings;
+ settings.CollectQueryStats(ECollectQueryStatsMode::Basic);
auto params = TParamsBuilder()
.AddParam("$key").Uint64(300).Build()
.Build();
- auto it = db.ExecuteStreamQuery(R"(
+ auto it = db.StreamExecuteScanQuery(R"(
DECLARE $key AS Uint64;
SELECT * FROM `/Root/EightShard` WHERE Key = $key + 1;
)", params, settings).GetValueSync();
UNIT_ASSERT(it.IsSuccess());
- TVector<TString> profiles;
- CompareYson(R"([[[3];[301u];["Value1"]]])", StreamResultToYson(it, &profiles));
-
- UNIT_ASSERT_EQUAL(2, profiles.size());
-
- {
- NYql::NDqProto::TDqExecutionStats stats;
- google::protobuf::TextFormat::ParseFromString(profiles[0], &stats);
- UNIT_ASSERT(stats.IsInitialized());
-
- NKqpProto::TKqpExecutionExtraStats extraStats;
- UNIT_ASSERT(stats.GetExtra().UnpackTo(&extraStats));
- UNIT_ASSERT_VALUES_EQUAL(extraStats.GetAffectedShards(), 0);
- }
-
- {
- NYql::NDqProto::TDqExecutionStats stats;
- google::protobuf::TextFormat::ParseFromString(profiles[1], &stats);
- UNIT_ASSERT(stats.IsInitialized());
+ auto res = CollectStreamResult(it);
+ CompareYson(R"([
+ [[3];[301u];["Value1"]]
+ ])", res.ResultSetYson);
- NKqpProto::TKqpExecutionExtraStats extraStats;
- UNIT_ASSERT(stats.GetExtra().UnpackTo(&extraStats));
- UNIT_ASSERT_VALUES_EQUAL(extraStats.GetAffectedShards(), 1);
- }
+ UNIT_ASSERT(res.QueryStats);
+ UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).affected_shards(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(1).affected_shards(), 1);
}
Y_UNIT_TEST_TWIN(TooManyComputeActors, UseSessionActor) {
diff --git a/ydb/core/kqp/ut/kqp_stats_ut.cpp b/ydb/core/kqp/ut/kqp_stats_ut.cpp
index 8a7406754ce..6202b0ba114 100644
--- a/ydb/core/kqp/ut/kqp_stats_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_stats_ut.cpp
@@ -16,17 +16,21 @@ Y_UNIT_TEST_SUITE(KqpStats) {
Y_UNIT_TEST_TWIN(MultiTxStatsFullExp, UseSessionActor) {
auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
- NExperimental::TStreamQueryClient db{kikimr.GetDriver()};
- auto settings = NExperimental::TExecuteStreamQuerySettings();
- settings.ProfileMode(NYdb::NExperimental::EStreamQueryProfileMode::Full);
+ auto db = kikimr.GetTableClient();
+
+ TStreamExecScanQuerySettings settings;
+ settings.CollectQueryStats(ECollectQueryStatsMode::Profile);
- auto it = db.ExecuteStreamQuery(R"(
+ auto it = db.StreamExecuteScanQuery(R"(
SELECT * FROM `/Root/EightShard` WHERE Key BETWEEN 150 AND 266 ORDER BY Data LIMIT 4;
)", settings).GetValueSync();
auto res = CollectStreamResult(it);
- UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
- UNIT_ASSERT_VALUES_EQUAL(res.ResultSetYson, R"([[[1];[202u];["Value2"]];[[2];[201u];["Value1"]];[[3];[203u];["Value3"]]])");
+ CompareYson(R"([
+ [[1];[202u];["Value2"]];
+ [[2];[201u];["Value1"]];
+ [[3];[203u];["Value3"]]
+ ])", res.ResultSetYson);
UNIT_ASSERT(res.PlanJson);
NJson::TJsonValue plan;
diff --git a/ydb/core/kqp/ut/kqp_sys_view_ut.cpp b/ydb/core/kqp/ut/kqp_sys_view_ut.cpp
index 23a0b6ca2a5..3ba730c872b 100644
--- a/ydb/core/kqp/ut/kqp_sys_view_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_sys_view_ut.cpp
@@ -380,6 +380,52 @@ Y_UNIT_TEST_SUITE(KqpSystemView) {
checkTable("`/Root/.sys/top_queries_by_cpu_time_one_hour`");
}
+ Y_UNIT_TEST_TWIN(QueryStatsScan, UseSessionActor) {
+ auto checkTable = [&] (const TStringBuf tableName) {
+ auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
+ auto client = kikimr.GetTableClient();
+
+ {
+ auto it = kikimr.GetTableClient().StreamExecuteScanQuery(R"(
+ SELECT COUNT(*) FROM `/Root/EightShard`
+ )").GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ CompareYson(R"([[24u]])", StreamResultToYson(it));
+ }
+
+ TStringStream request;
+ request << "SELECT ReadBytes FROM " << tableName << " ORDER BY ReadBytes";
+
+ auto it = client.StreamExecuteScanQuery(request.Str()).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ TSet<ui64> readBytesSet;
+ for (;;) {
+ auto streamPart = it.ReadNext().GetValueSync();
+ if (!streamPart.IsSuccess()) {
+ UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
+ break;
+ }
+
+ if (streamPart.HasResultSet()) {
+ auto resultSet = streamPart.ExtractResultSet();
+
+ NYdb::TResultSetParser parser(resultSet);
+ while (parser.TryNextRow()) {
+ auto value = parser.ColumnParser("ReadBytes").GetOptionalUint64();
+ UNIT_ASSERT(value);
+ readBytesSet.emplace(*value);
+ }
+ }
+ }
+
+ UNIT_ASSERT(readBytesSet.contains(192)); // EightShard
+ };
+
+ checkTable("`/Root/.sys/top_queries_by_read_bytes_one_minute`");
+ }
+
Y_UNIT_TEST(FailNavigate) {
TKikimrRunner kikimr("user0@builtin");
auto client = kikimr.GetTableClient();
diff --git a/ydb/core/kqp/ut/kqp_table_predicate_ut.cpp b/ydb/core/kqp/ut/kqp_table_predicate_ut.cpp
index e5dfdd15767..1550438a757 100644
--- a/ydb/core/kqp/ut/kqp_table_predicate_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_table_predicate_ut.cpp
@@ -181,32 +181,29 @@ void CreateTableWithIntKey(TSession session, ui64 partitions, ui32 rangesPerPart
UNIT_ASSERT(success);
}
-void ExecuteStreamQueryAndCheck(NExperimental::TStreamQueryClient& db, const TString& query,
+void ExecuteStreamQueryAndCheck(NYdb::NTable::TTableClient& db, const TString& query,
const TString& expectedYson)
{
- auto settings = NExperimental::TExecuteStreamQuerySettings()
- .ProfileMode(NExperimental::EStreamQueryProfileMode::Basic);
+ TStreamExecScanQuerySettings settings;
+ settings.CollectQueryStats(ECollectQueryStatsMode::Basic);
- auto it = db.ExecuteStreamQuery(query, settings).GetValueSync();
+ auto it = db.StreamExecuteScanQuery(query, settings).GetValueSync();
UNIT_ASSERT(it.IsSuccess());
- TVector<TString> profiles;
- auto resultYson = StreamResultToYson(it, &profiles);
+ auto res = CollectStreamResult(it);
Cerr << "---------QUERY----------" << Endl;
Cerr << query << Endl;
Cerr << "---------RESULT---------" << Endl;
- Cerr << resultYson << Endl;
+ Cerr << res.ResultSetYson << Endl;
Cerr << "------------------------" << Endl;
- CompareYson(expectedYson, resultYson);
+ CompareYson(expectedYson, res.ResultSetYson);
- NYql::NDqProto::TDqExecutionStats stats;
- // First stage is computation, second scan read.
- google::protobuf::TextFormat::ParseFromString(profiles.back(), &stats);
+ UNIT_ASSERT(res.QueryStats);
+ ui64 readRows = res.QueryStats->query_phases().rbegin()->table_access(0).reads().rows();
+ ui64 resultRows = res.RowsCount;
- ui64 resultRows = stats.GetResultRows();
- ui64 readRows = stats.GetTables(0).GetReadRows();
UNIT_ASSERT_EQUAL_C(resultRows, readRows, "There are " << resultRows << " in result, but read " << readRows << " !");
}
@@ -214,10 +211,8 @@ void RunTestOverIntTable(const TString& query, const TString& expectedYson, ui64
TKikimrSettings kikimrSettings;
TKikimrRunner kikimr(kikimrSettings);
- NExperimental::TStreamQueryClient db(kikimr.GetDriver());
-
- auto client = kikimr.GetTableClient();
- auto session = client.CreateSession().GetValueSync().GetSession();
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
CreateTableWithIntKey(session, partitions, rangesPerPartition);
ExecuteStreamQueryAndCheck(db, query, expectedYson);
@@ -1220,7 +1215,6 @@ Y_UNIT_TEST_SUITE(KqpTablePredicate) {
Y_UNIT_TEST(NoFullScanAtDNFPredicate) {
TKikimrRunner kikimr;
- NExperimental::TStreamQueryClient streamDb(kikimr.GetDriver());
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1271,7 +1265,7 @@ Y_UNIT_TEST_SUITE(KqpTablePredicate) {
ORDER BY Value;
)");
SubstGlobal(query, "<PREDICATE>", data.first);
- ExecuteStreamQueryAndCheck(streamDb, query, data.second);
+ ExecuteStreamQueryAndCheck(db, query, data.second);
}
}