aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-07-10 14:46:30 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-07-10 14:46:30 +0300
commit3424811d47da3a8696b042c30ec54ec4c762600f (patch)
treecad61b2a920268164c553ef9d2a86b8d46718862
parent6f7cd3430b8b442a97a4485c917d188ef9633e19 (diff)
downloadydb-3424811d47da3a8696b042c30ec54ec4c762600f.tar.gz
KIKIMR-17412: add tests for stream yql query
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp35
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h3
-rw-r--r--ydb/core/kqp/ut/query/kqp_stats_ut.cpp125
3 files changed, 120 insertions, 43 deletions
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 90c7ae8a48d..1d1248864f4 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -727,15 +727,25 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {
if constexpr (std::is_same_v<TIterator, NYdb::NTable::TScanQueryPartIterator>) {
UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.HasQueryStats(),
"Unexpected empty scan query response.");
+
+ if (streamPart.HasResultSet()) {
+ auto resultSet = streamPart.ExtractResultSet();
+ PrintResultSet(resultSet, resultSetWriter);
+ res.RowsCount += resultSet.RowsCount();
+ }
}
- if (streamPart.HasResultSet()) {
- auto resultSet = streamPart.ExtractResultSet();
- PrintResultSet(resultSet, resultSetWriter);
- res.RowsCount += resultSet.RowsCount();
+ if constexpr (std::is_same_v<TIterator, NYdb::NScripting::TYqlResultPartIterator>) {
+ if (streamPart.HasPartialResult()) {
+ const auto& partialResult = streamPart.GetPartialResult();
+ const auto& resultSet = partialResult.GetResultSet();
+ PrintResultSet(resultSet, resultSetWriter);
+ res.RowsCount += resultSet.RowsCount();
+ }
}
- if constexpr (std::is_same_v<TIterator, NYdb::NTable::TScanQueryPartIterator>) {
+ if constexpr (std::is_same_v<TIterator, NYdb::NTable::TScanQueryPartIterator>
+ || std::is_same_v<TIterator, NYdb::NScripting::TYqlResultPartIterator>) {
if (streamPart.HasQueryStats() ) {
res.QueryStats = NYdb::TProtoAccessor::GetProto(streamPart.GetQueryStats());
@@ -757,10 +767,14 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {
return res;
}
-TCollectedStreamResult CollectStreamResult(NYdb::NTable::TScanQueryPartIterator& it) {
+template<typename TIterator>
+TCollectedStreamResult CollectStreamResult(TIterator& it) {
return CollectStreamResultImpl(it);
}
+template TCollectedStreamResult CollectStreamResult(NYdb::NTable::TScanQueryPartIterator& it);
+template TCollectedStreamResult CollectStreamResult(NYdb::NScripting::TYqlResultPartIterator& it);
+
TString ReadTableToYson(NYdb::NTable::TSession session, const TString& table) {
TReadTableSettings settings;
settings.Ordered(true);
@@ -871,6 +885,15 @@ NJson::TJsonValue FindPlanNodeByKv(const NJson::TJsonValue& plan, const TString&
}
}
}
+
+ if (map.contains("queries")) {
+ for (const auto &node : map["queries"].GetArraySafe()) {
+ auto op = FindPlanNodeByKv(node, key, value);
+ if (op.IsDefined()) {
+ return op;
+ }
+ }
+ }
} else {
Y_ASSERT(false);
}
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index 726218d65e7..6d627ba3517 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -179,7 +179,8 @@ struct TCollectedStreamResult {
ui64 RowsCount = 0;
};
-TCollectedStreamResult CollectStreamResult(NYdb::NTable::TScanQueryPartIterator& it);
+template<typename TIterator>
+TCollectedStreamResult CollectStreamResult(TIterator& it);
enum class EIndexTypeSql {
Global,
diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
index b91955cb65c..e3f2c728512 100644
--- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
@@ -4,6 +4,7 @@
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
#include <cstdlib>
@@ -15,17 +16,39 @@ using namespace NYdb::NTable;
Y_UNIT_TEST_SUITE(KqpStats) {
-Y_UNIT_TEST(MultiTxStatsFullExp) {
- auto kikimr = DefaultKikimrRunner();
+auto GetYqlStreamIterator(
+ TKikimrRunner& kikimr,
+ ECollectQueryStatsMode mode,
+ const TString& query) {
+ NYdb::NScripting::TExecuteYqlRequestSettings settings;
+ settings.CollectQueryStats(mode);
+
+ NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
+
+ auto it = client.StreamExecuteYqlScript(query, settings).GetValueSync();
+ return it;
+}
+
+auto GetScanStreamIterator(
+ TKikimrRunner& kikimr,
+ ECollectQueryStatsMode mode,
+ const TString& query) {
auto db = kikimr.GetTableClient();
TStreamExecScanQuerySettings settings;
- settings.CollectQueryStats(ECollectQueryStatsMode::Profile);
+ settings.CollectQueryStats(mode);
- auto it = db.StreamExecuteScanQuery(R"(
- SELECT * FROM `/Root/EightShard` WHERE Key BETWEEN 150 AND 266 ORDER BY Data LIMIT 4;
- )", settings).GetValueSync();
+ auto it = db.StreamExecuteScanQuery(query, settings).GetValueSync();
+ return it;
+}
+template <typename Iterator>
+void MultiTxStatsFullExp(
+ std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) {
+ auto kikimr = DefaultKikimrRunner();
+ auto it = getIter(kikimr, ECollectQueryStatsMode::Profile, R"(
+ SELECT * FROM `/Root/EightShard` WHERE Key BETWEEN 150 AND 266 ORDER BY Data LIMIT 4;
+ )");
auto res = CollectStreamResult(it);
CompareYson(R"([
[[1];[202u];["Value2"]];
@@ -34,7 +57,6 @@ Y_UNIT_TEST(MultiTxStatsFullExp) {
])", res.ResultSetYson);
UNIT_ASSERT(res.PlanJson);
- Cerr << *res.PlanJson << Endl;
NJson::TJsonValue plan;
NJson::ReadJsonTree(*res.PlanJson, &plan, true);
auto node = FindPlanNodeByKv(plan, "Node Type", "TopSort-TableRangeScan");
@@ -44,16 +66,21 @@ Y_UNIT_TEST(MultiTxStatsFullExp) {
UNIT_ASSERT_EQUAL(node.GetMap().at("Stats").GetMapSafe().at("TotalTasks").GetIntegerSafe(), 2);
}
-Y_UNIT_TEST(JoinNoStats) {
- auto kikimr = DefaultKikimrRunner();
- auto db = kikimr.GetTableClient();
- TStreamExecScanQuerySettings settings;
- settings.CollectQueryStats(ECollectQueryStatsMode::None);
+Y_UNIT_TEST(MultiTxStatsFullExpYql) {
+ MultiTxStatsFullExp<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator);
+}
- auto it = db.StreamExecuteScanQuery(R"(
- SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key;
- )", settings).GetValueSync();
+Y_UNIT_TEST(MultiTxStatsFullExpScan) {
+ MultiTxStatsFullExp<NYdb::NTable::TScanQueryPartIterator>(GetScanStreamIterator);
+}
+template <typename Iterator>
+void JoinNoStats(
+ std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) {
+ auto kikimr = DefaultKikimrRunner();
+ auto it = getIter(kikimr, ECollectQueryStatsMode::None, R"(
+ SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key;
+ )");
auto res = CollectStreamResult(it);
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(res.ResultSetYson, "[[16u]]");
@@ -62,27 +89,50 @@ Y_UNIT_TEST(JoinNoStats) {
UNIT_ASSERT(!res.PlanJson);
}
-Y_UNIT_TEST(JoinStatsBasic) {
+Y_UNIT_TEST(JoinNoStatsYql) {
+ JoinNoStats<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator);
+}
+
+Y_UNIT_TEST(JoinNoStatsScan) {
+ JoinNoStats<NYdb::NTable::TScanQueryPartIterator>(GetScanStreamIterator);
+}
+
+template <typename Iterator>
+TCollectedStreamResult JoinStatsBasic(
+ std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false);
auto settings = TKikimrSettings()
- .SetAppConfig(appConfig); // TODO: enable stream lookup KIKIMR-14294
-
+ .SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
- auto db = kikimr.GetTableClient();
- TStreamExecScanQuerySettings querySettings;
- querySettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
- auto it = db.StreamExecuteScanQuery(R"(
+ auto it = getIter(kikimr, ECollectQueryStatsMode::Basic, R"(
SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key;
- )", querySettings).GetValueSync();
-
+ )");
auto res = CollectStreamResult(it);
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(res.ResultSetYson, "[[16u]]");
UNIT_ASSERT(res.QueryStats);
+ return res;
+}
+
+Y_UNIT_TEST(JoinStatsBasicYql) {
+ auto res = JoinStatsBasic<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator);
+
+ UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 3);
+ if (res.QueryStats->query_phases(0).table_access(0).name() == "/Root/KeyValue") {
+ UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(2).table_access(0).name(), "/Root/EightShard");
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).table_access(0).name(), "/Root/EightShard");
+ UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(2).table_access(0).name(), "/Root/KeyValue");
+ }
+}
+
+Y_UNIT_TEST(JoinStatsBasicScan) {
+ auto res = JoinStatsBasic<NYdb::NTable::TScanQueryPartIterator>(GetScanStreamIterator);
+
UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 2);
if (res.QueryStats->query_phases(0).table_access(0).name() == "/Root/KeyValue") {
UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).table_access(0).name(), "/Root/KeyValue");
@@ -99,17 +149,15 @@ Y_UNIT_TEST(JoinStatsBasic) {
UNIT_ASSERT(!res.PlanJson);
}
-Y_UNIT_TEST(MultiTxStatsFull) {
+template <typename Iterator>
+void MultiTxStatsFull(
+ std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getResult) {
auto kikimr = DefaultKikimrRunner();
- auto db = kikimr.GetTableClient();
- TStreamExecScanQuerySettings settings;
- settings.CollectQueryStats(ECollectQueryStatsMode::Full);
-
- auto it = db.StreamExecuteScanQuery(R"(
+ auto it = getResult(kikimr, ECollectQueryStatsMode::Full, R"(
SELECT * FROM `/Root/EightShard` WHERE Key BETWEEN 150 AND 266 ORDER BY Data LIMIT 4;
- )", settings).GetValueSync();
-
+ )");
auto res = CollectStreamResult(it);
+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(
res.ResultSetYson,
@@ -131,6 +179,14 @@ Y_UNIT_TEST(MultiTxStatsFull) {
UNIT_ASSERT_EQUAL(node.GetMap().at("Stats").GetMapSafe().at("TotalTasks").GetIntegerSafe(), 2);
}
+Y_UNIT_TEST(MultiTxStatsFullYql) {
+ MultiTxStatsFull<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator);
+}
+
+Y_UNIT_TEST(MultiTxStatsFullScan) {
+ MultiTxStatsFull<NYdb::NTable::TScanQueryPartIterator>(GetScanStreamIterator);
+}
+
Y_UNIT_TEST(DeferredEffects) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetTableClient();
@@ -142,7 +198,6 @@ Y_UNIT_TEST(DeferredEffects) {
settings.CollectQueryStats(ECollectQueryStatsMode::Full);
auto result = session.ExecuteDataQuery(R"(
-
UPSERT INTO `/Root/TwoShard`
SELECT Key + 100u AS Key, Value1 FROM `/Root/TwoShard` WHERE Key in (1,2,3,4,5);
)", TTxControl::BeginTx(), settings).ExtractValueSync();
@@ -204,7 +259,6 @@ Y_UNIT_TEST(DataQueryWithEffects) {
settings.CollectQueryStats(ECollectQueryStatsMode::Full);
auto result = session.ExecuteDataQuery(R"(
-
UPSERT INTO `/Root/TwoShard`
SELECT Key + 1u AS Key, Value1 FROM `/Root/TwoShard`;
)", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), settings).ExtractValueSync();
@@ -227,7 +281,6 @@ Y_UNIT_TEST(DataQueryMulti) {
settings.CollectQueryStats(ECollectQueryStatsMode::Full);
auto result = session.ExecuteDataQuery(R"(
-
SELECT 1;
SELECT 2;
SELECT 3;
@@ -368,7 +421,7 @@ Y_UNIT_TEST(StreamLookupStats) {
NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true);
auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
UNIT_ASSERT(streamLookup.IsDefined());
-
+
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1);