diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-07-10 14:46:30 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-07-10 14:46:30 +0300 |
commit | 3424811d47da3a8696b042c30ec54ec4c762600f (patch) | |
tree | cad61b2a920268164c553ef9d2a86b8d46718862 | |
parent | 6f7cd3430b8b442a97a4485c917d188ef9633e19 (diff) | |
download | ydb-3424811d47da3a8696b042c30ec54ec4c762600f.tar.gz |
KIKIMR-17412: add tests for stream yql query
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 35 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_stats_ut.cpp | 125 |
3 files changed, 120 insertions, 43 deletions
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 90c7ae8a48d..1d1248864f4 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -727,15 +727,25 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) { if constexpr (std::is_same_v<TIterator, NYdb::NTable::TScanQueryPartIterator>) { UNIT_ASSERT_C(streamPart.HasResultSet() || streamPart.HasQueryStats(), "Unexpected empty scan query response."); + + if (streamPart.HasResultSet()) { + auto resultSet = streamPart.ExtractResultSet(); + PrintResultSet(resultSet, resultSetWriter); + res.RowsCount += resultSet.RowsCount(); + } } - if (streamPart.HasResultSet()) { - auto resultSet = streamPart.ExtractResultSet(); - PrintResultSet(resultSet, resultSetWriter); - res.RowsCount += resultSet.RowsCount(); + if constexpr (std::is_same_v<TIterator, NYdb::NScripting::TYqlResultPartIterator>) { + if (streamPart.HasPartialResult()) { + const auto& partialResult = streamPart.GetPartialResult(); + const auto& resultSet = partialResult.GetResultSet(); + PrintResultSet(resultSet, resultSetWriter); + res.RowsCount += resultSet.RowsCount(); + } } - if constexpr (std::is_same_v<TIterator, NYdb::NTable::TScanQueryPartIterator>) { + if constexpr (std::is_same_v<TIterator, NYdb::NTable::TScanQueryPartIterator> + || std::is_same_v<TIterator, NYdb::NScripting::TYqlResultPartIterator>) { if (streamPart.HasQueryStats() ) { res.QueryStats = NYdb::TProtoAccessor::GetProto(streamPart.GetQueryStats()); @@ -757,10 +767,14 @@ TCollectedStreamResult CollectStreamResultImpl(TIterator& it) { return res; } -TCollectedStreamResult CollectStreamResult(NYdb::NTable::TScanQueryPartIterator& it) { +template<typename TIterator> +TCollectedStreamResult CollectStreamResult(TIterator& it) { return CollectStreamResultImpl(it); } +template TCollectedStreamResult CollectStreamResult(NYdb::NTable::TScanQueryPartIterator& it); +template TCollectedStreamResult CollectStreamResult(NYdb::NScripting::TYqlResultPartIterator& it); + TString ReadTableToYson(NYdb::NTable::TSession session, const TString& table) { TReadTableSettings settings; settings.Ordered(true); @@ -871,6 +885,15 @@ NJson::TJsonValue FindPlanNodeByKv(const NJson::TJsonValue& plan, const TString& } } } + + if (map.contains("queries")) { + for (const auto &node : map["queries"].GetArraySafe()) { + auto op = FindPlanNodeByKv(node, key, value); + if (op.IsDefined()) { + return op; + } + } + } } else { Y_ASSERT(false); } diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index 726218d65e7..6d627ba3517 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -179,7 +179,8 @@ struct TCollectedStreamResult { ui64 RowsCount = 0; }; -TCollectedStreamResult CollectStreamResult(NYdb::NTable::TScanQueryPartIterator& it); +template<typename TIterator> +TCollectedStreamResult CollectStreamResult(TIterator& it); enum class EIndexTypeSql { Global, diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp index b91955cb65c..e3f2c728512 100644 --- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp @@ -4,6 +4,7 @@ #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> +#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h> #include <cstdlib> @@ -15,17 +16,39 @@ using namespace NYdb::NTable; Y_UNIT_TEST_SUITE(KqpStats) { -Y_UNIT_TEST(MultiTxStatsFullExp) { - auto kikimr = DefaultKikimrRunner(); +auto GetYqlStreamIterator( + TKikimrRunner& kikimr, + ECollectQueryStatsMode mode, + const TString& query) { + NYdb::NScripting::TExecuteYqlRequestSettings settings; + settings.CollectQueryStats(mode); + + NYdb::NScripting::TScriptingClient client(kikimr.GetDriver()); + + auto it = client.StreamExecuteYqlScript(query, settings).GetValueSync(); + return it; +} + +auto GetScanStreamIterator( + TKikimrRunner& kikimr, + ECollectQueryStatsMode mode, + const TString& query) { auto db = kikimr.GetTableClient(); TStreamExecScanQuerySettings settings; - settings.CollectQueryStats(ECollectQueryStatsMode::Profile); + settings.CollectQueryStats(mode); - auto it = db.StreamExecuteScanQuery(R"( - SELECT * FROM `/Root/EightShard` WHERE Key BETWEEN 150 AND 266 ORDER BY Data LIMIT 4; - )", settings).GetValueSync(); + auto it = db.StreamExecuteScanQuery(query, settings).GetValueSync(); + return it; +} +template <typename Iterator> +void MultiTxStatsFullExp( + std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) { + auto kikimr = DefaultKikimrRunner(); + auto it = getIter(kikimr, ECollectQueryStatsMode::Profile, R"( + SELECT * FROM `/Root/EightShard` WHERE Key BETWEEN 150 AND 266 ORDER BY Data LIMIT 4; + )"); auto res = CollectStreamResult(it); CompareYson(R"([ [[1];[202u];["Value2"]]; @@ -34,7 +57,6 @@ Y_UNIT_TEST(MultiTxStatsFullExp) { ])", res.ResultSetYson); UNIT_ASSERT(res.PlanJson); - Cerr << *res.PlanJson << Endl; NJson::TJsonValue plan; NJson::ReadJsonTree(*res.PlanJson, &plan, true); auto node = FindPlanNodeByKv(plan, "Node Type", "TopSort-TableRangeScan"); @@ -44,16 +66,21 @@ Y_UNIT_TEST(MultiTxStatsFullExp) { UNIT_ASSERT_EQUAL(node.GetMap().at("Stats").GetMapSafe().at("TotalTasks").GetIntegerSafe(), 2); } -Y_UNIT_TEST(JoinNoStats) { - auto kikimr = DefaultKikimrRunner(); - auto db = kikimr.GetTableClient(); - TStreamExecScanQuerySettings settings; - settings.CollectQueryStats(ECollectQueryStatsMode::None); +Y_UNIT_TEST(MultiTxStatsFullExpYql) { + MultiTxStatsFullExp<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator); +} - auto it = db.StreamExecuteScanQuery(R"( - SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key; - )", settings).GetValueSync(); +Y_UNIT_TEST(MultiTxStatsFullExpScan) { + MultiTxStatsFullExp<NYdb::NTable::TScanQueryPartIterator>(GetScanStreamIterator); +} +template <typename Iterator> +void JoinNoStats( + std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) { + auto kikimr = DefaultKikimrRunner(); + auto it = getIter(kikimr, ECollectQueryStatsMode::None, R"( + SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key; + )"); auto res = CollectStreamResult(it); UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(res.ResultSetYson, "[[16u]]"); @@ -62,27 +89,50 @@ Y_UNIT_TEST(JoinNoStats) { UNIT_ASSERT(!res.PlanJson); } -Y_UNIT_TEST(JoinStatsBasic) { +Y_UNIT_TEST(JoinNoStatsYql) { + JoinNoStats<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator); +} + +Y_UNIT_TEST(JoinNoStatsScan) { + JoinNoStats<NYdb::NTable::TScanQueryPartIterator>(GetScanStreamIterator); +} + +template <typename Iterator> +TCollectedStreamResult JoinStatsBasic( + std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false); auto settings = TKikimrSettings() - .SetAppConfig(appConfig); // TODO: enable stream lookup KIKIMR-14294 - + .SetAppConfig(appConfig); TKikimrRunner kikimr(settings); - auto db = kikimr.GetTableClient(); - TStreamExecScanQuerySettings querySettings; - querySettings.CollectQueryStats(ECollectQueryStatsMode::Basic); - auto it = db.StreamExecuteScanQuery(R"( + auto it = getIter(kikimr, ECollectQueryStatsMode::Basic, R"( SELECT count(*) FROM `/Root/EightShard` AS t JOIN `/Root/KeyValue` AS kv ON t.Data = kv.Key; - )", querySettings).GetValueSync(); - + )"); auto res = CollectStreamResult(it); UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(res.ResultSetYson, "[[16u]]"); UNIT_ASSERT(res.QueryStats); + return res; +} + +Y_UNIT_TEST(JoinStatsBasicYql) { + auto res = JoinStatsBasic<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator); + + UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 3); + if (res.QueryStats->query_phases(0).table_access(0).name() == "/Root/KeyValue") { + UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(2).table_access(0).name(), "/Root/EightShard"); + } else { + UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).table_access(0).name(), "/Root/EightShard"); + UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(2).table_access(0).name(), "/Root/KeyValue"); + } +} + +Y_UNIT_TEST(JoinStatsBasicScan) { + auto res = JoinStatsBasic<NYdb::NTable::TScanQueryPartIterator>(GetScanStreamIterator); + UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 2); if (res.QueryStats->query_phases(0).table_access(0).name() == "/Root/KeyValue") { UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).table_access(0).name(), "/Root/KeyValue"); @@ -99,17 +149,15 @@ Y_UNIT_TEST(JoinStatsBasic) { UNIT_ASSERT(!res.PlanJson); } -Y_UNIT_TEST(MultiTxStatsFull) { +template <typename Iterator> +void MultiTxStatsFull( + std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getResult) { auto kikimr = DefaultKikimrRunner(); - auto db = kikimr.GetTableClient(); - TStreamExecScanQuerySettings settings; - settings.CollectQueryStats(ECollectQueryStatsMode::Full); - - auto it = db.StreamExecuteScanQuery(R"( + auto it = getResult(kikimr, ECollectQueryStatsMode::Full, R"( SELECT * FROM `/Root/EightShard` WHERE Key BETWEEN 150 AND 266 ORDER BY Data LIMIT 4; - )", settings).GetValueSync(); - + )"); auto res = CollectStreamResult(it); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL( res.ResultSetYson, @@ -131,6 +179,14 @@ Y_UNIT_TEST(MultiTxStatsFull) { UNIT_ASSERT_EQUAL(node.GetMap().at("Stats").GetMapSafe().at("TotalTasks").GetIntegerSafe(), 2); } +Y_UNIT_TEST(MultiTxStatsFullYql) { + MultiTxStatsFull<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator); +} + +Y_UNIT_TEST(MultiTxStatsFullScan) { + MultiTxStatsFull<NYdb::NTable::TScanQueryPartIterator>(GetScanStreamIterator); +} + Y_UNIT_TEST(DeferredEffects) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetTableClient(); @@ -142,7 +198,6 @@ Y_UNIT_TEST(DeferredEffects) { settings.CollectQueryStats(ECollectQueryStatsMode::Full); auto result = session.ExecuteDataQuery(R"( - UPSERT INTO `/Root/TwoShard` SELECT Key + 100u AS Key, Value1 FROM `/Root/TwoShard` WHERE Key in (1,2,3,4,5); )", TTxControl::BeginTx(), settings).ExtractValueSync(); @@ -204,7 +259,6 @@ Y_UNIT_TEST(DataQueryWithEffects) { settings.CollectQueryStats(ECollectQueryStatsMode::Full); auto result = session.ExecuteDataQuery(R"( - UPSERT INTO `/Root/TwoShard` SELECT Key + 1u AS Key, Value1 FROM `/Root/TwoShard`; )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), settings).ExtractValueSync(); @@ -227,7 +281,6 @@ Y_UNIT_TEST(DataQueryMulti) { settings.CollectQueryStats(ECollectQueryStatsMode::Full); auto result = session.ExecuteDataQuery(R"( - SELECT 1; SELECT 2; SELECT 3; @@ -368,7 +421,7 @@ Y_UNIT_TEST(StreamLookupStats) { NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true); auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); UNIT_ASSERT(streamLookup.IsDefined()); - + auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1); |