diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2024-01-30 15:28:17 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-30 15:28:17 +0300 |
commit | 7c628c7b86cecdd277cb87453e3c733e4bc15943 (patch) | |
tree | a978d89649d1ab451cb877fcf140d97adf87e007 | |
parent | 658eb362cb3806ee45cbdb944efb6719651af4b3 (diff) | |
download | ydb-7c628c7b86cecdd277cb87453e3c733e4bc15943.tar.gz |
Blocks for first stages (#1404)
-rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_query.cpp | 22 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_runner.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 153 |
4 files changed, 165 insertions, 32 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index c5860551ee..88c3b86e56 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -176,6 +176,25 @@ bool NeedReportStats(const Ydb::Query::ExecuteQueryRequest& req) { } } +bool NeedReportAst(const Ydb::Query::ExecuteQueryRequest& req) { + switch (req.exec_mode()) { + case Ydb::Query::EXEC_MODE_EXPLAIN: + return true; + + case Ydb::Query::EXEC_MODE_EXECUTE: + switch (req.stats_mode()) { + case Ydb::Query::StatsMode::STATS_MODE_FULL: + case Ydb::Query::StatsMode::STATS_MODE_PROFILE: + return true; + default: + return false; + } + + default: + return false; + } +} + class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> { public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -382,6 +401,9 @@ private: if (NeedReportStats(*Request_->GetProtoRequest())) { hasTrailingMessage = true; FillQueryStats(*response.mutable_exec_stats(), kqpResponse); + if (NeedReportAst(*Request_->GetProtoRequest())) { + response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst()); + } } if (hasTrailingMessage) { diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index 2925bd58a3..b73e6ebe86 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -138,6 +138,8 @@ public: : Gateway(gateway) , Cluster(cluster) , TypesCtx(*typesCtx) + , SessionCtx(sessionCtx) + , FunctionRegistry(funcRegistry) , Config(sessionCtx->ConfigPtr()) , TransformCtx(MakeIntrusive<TKqlTransformContext>(Config, sessionCtx->QueryPtr(), sessionCtx->TablesPtr())) , OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(), @@ -192,14 +194,19 @@ public: YQL_ENSURE(IsIn({EKikimrQueryType::Query, EKikimrQueryType::Script}, TransformCtx->QueryCtx->Type)); YQL_ENSURE(TMaybeNode<TKiDataQueryBlocks>(query)); + TypesCtx.BlockEngineMode = NYql::EBlockEngineMode::Auto; + return PrepareQueryInternal(cluster, TKiDataQueryBlocks(query), ctx, settings); } private: + TIntrusivePtr<TAsyncQueryResult> PrepareQueryInternal(const TString& cluster, const TKiDataQueryBlocks& dataQueryBlocks, TExprContext& ctx, const IKikimrQueryExecutor::TExecuteSettings& settings) { + CreateGraphTransformer(&TypesCtx, SessionCtx, FunctionRegistry); + YQL_ENSURE(cluster == Cluster); YQL_ENSURE(!settings.CommitTx); YQL_ENSURE(!settings.RollbackTx); @@ -311,7 +318,7 @@ private: TAutoPtr<IGraphTransformer> compilePhysicalQuery(new TCompilePhysicalQueryTransformer(Cluster, *TransformCtx, *OptimizeCtx, - TypesCtx, + *typesCtx, funcRegistry, Config)); @@ -349,6 +356,8 @@ private: TIntrusivePtr<IKqpGateway> Gateway; TString Cluster; TTypeAnnotationContext& TypesCtx; + TIntrusivePtr<TKikimrSessionContext> SessionCtx; + const NMiniKQL::IFunctionRegistry& FunctionRegistry; TKikimrConfiguration::TPtr Config; TIntrusivePtr<TKqlTransformContext> TransformCtx; diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 28022e3576..adea3a31a0 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -908,7 +908,16 @@ static void FillPlan(const NYdb::NScripting::TYqlResultPart& streamPart, TCollec } } -static void FillPlan(const NYdb::NQuery::TExecuteQueryPart& /*streamPart*/, TCollectedStreamResult& /*res*/) {} +static void FillPlan(const NYdb::NQuery::TExecuteQueryPart& streamPart, TCollectedStreamResult& res) { + if (streamPart.GetStats() ) { + res.QueryStats = NYdb::TProtoAccessor::GetProto(*streamPart.GetStats()); + + auto plan = res.QueryStats->query_plan(); + if (!plan.empty()) { + res.PlanJson = plan; + } + } +} template<typename TIterator> TCollectedStreamResult CollectStreamResultImpl(TIterator& it) { diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 9f9a4701b6..d8d10c18fb 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -690,12 +690,27 @@ Y_UNIT_TEST_SUITE(KqpOlap) { }; } - void CheckPlanForAggregatePushdown(const TString& query, NYdb::NTable::TTableClient& tableClient, const std::vector<std::string>& planNodes, + template <typename TClient> + auto StreamExplainQuery(const TString& query, TClient& client) { + if constexpr (std::is_same_v<NYdb::NTable::TTableClient, TClient>) { + TStreamExecScanQuerySettings scanSettings; + scanSettings.Explain(true); + return client.StreamExecuteScanQuery(query, scanSettings).GetValueSync(); + } else { + NYdb::NQuery::TExecuteQuerySettings scanSettings; + scanSettings.ExecMode(NYdb::NQuery::EExecMode::Explain); + return client.StreamExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), scanSettings).GetValueSync(); + } + } + + template <typename TClient> + void CheckPlanForAggregatePushdown( + const TString& query, + TClient& client, + const std::vector<std::string>& expectedPlanNodes, const std::string& readNodeType) { - TStreamExecScanQuerySettings scanSettings; - scanSettings.Explain(true); - auto res = tableClient.StreamExecuteScanQuery(query, scanSettings).GetValueSync(); + auto res = StreamExplainQuery(query, client); UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); auto planRes = CollectStreamResult(res); @@ -704,7 +719,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Cerr << planRes.PlanJson.GetOrElse("NO_PLAN") << Endl; Cerr << "AST:" << Endl; Cerr << ast << Endl; - for (auto planNode : planNodes) { + for (auto planNode : expectedPlanNodes) { UNIT_ASSERT_C(ast.find(planNode) != std::string::npos, TStringBuilder() << planNode << " was not found. Query: " << query); } @@ -2437,10 +2452,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) { ExpectedReply = value; return *this; } + TAggregationTestCase& AddExpectedPlanOptions(const std::string& value) { ExpectedPlanOptions.emplace_back(value); return *this; } + const std::vector<std::string>& GetExpectedPlanOptions() const { return ExpectedPlanOptions; } @@ -2548,7 +2565,28 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TestAggregationsInternal(cases); } - void TestClickBenchBase(const std::vector<TAggregationTestCase>& cases) { + template <typename TClient> + auto StreamExecuteQuery(const TAggregationTestCase& testCase, TClient& client) { + if constexpr (std::is_same_v<NYdb::NTable::TTableClient, TClient>) { + return client.StreamExecuteScanQuery(testCase.GetFixedQuery()).GetValueSync(); + } else { + return client.StreamExecuteQuery( + testCase.GetFixedQuery(), + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + } + } + + template <typename TClient> + void RunTestCaseWithClient(const TAggregationTestCase& testCase, TClient& client) { + auto it = StreamExecuteQuery(testCase, client); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + if (!testCase.GetExpectedReply().empty()) { + CompareYson(result, testCase.GetExpectedReply()); + } + } + + void TestClickBenchBase(const std::vector<TAggregationTestCase>& cases, const bool genericQuery) { auto settings = TKikimrSettings() .SetWithSampleTables(false) .SetForceColumnTablesCompositeMarks(true); @@ -2564,17 +2602,20 @@ Y_UNIT_TEST_SUITE(KqpOlap) { WriteTestDataForClickBench(kikimr, "/Root/benchTable", 0, 1000000 + i * 1000000, iterationPackSize); } - for (auto&& i : cases) { - const TString queryFixed = i.GetFixedQuery(); - { - auto it = tableClient.StreamExecuteScanQuery(queryFixed).GetValueSync(); - UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); - TString result = StreamResultToYson(it); - if (!i.GetExpectedReply().empty()) { - CompareYson(result, i.GetExpectedReply()); - } + if (!genericQuery) { + auto tableClient = kikimr.GetTableClient(); + for (auto&& i : cases) { + const TString queryFixed = i.GetFixedQuery(); + RunTestCaseWithClient(i, tableClient); + CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType()); + } + } else { + auto queryClient = kikimr.GetQueryClient(); + for (auto&& i : cases) { + const TString queryFixed = i.GetFixedQuery(); + RunTestCaseWithClient(i, queryClient); + CheckPlanForAggregatePushdown(queryFixed, queryClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType()); } - CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType()); } } @@ -2637,12 +2678,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } } - void TestClickBench(const std::vector<TAggregationTestCase>& cases) { - TestClickBenchBase(cases); - TestClickBenchInternal(cases); + void TestClickBench(const std::vector<TAggregationTestCase>& cases, const bool genericQuery = false) { + TestClickBenchBase(cases, genericQuery); + if (!genericQuery) { + TestClickBenchInternal(cases); + } } - void TestTableWithNulls(const std::vector<TAggregationTestCase>& cases) { + void TestTableWithNulls(const std::vector<TAggregationTestCase>& cases, const bool genericQuery = false) { auto settings = TKikimrSettings() .SetWithSampleTables(false) .SetForceColumnTablesCompositeMarks(true); @@ -2656,17 +2699,20 @@ Y_UNIT_TEST_SUITE(KqpOlap) { WriteTestDataForTableWithNulls(kikimr, "/Root/tableWithNulls"); } - for (auto&& i : cases) { - const TString queryFixed = i.GetFixedQuery(); - { - auto it = tableClient.StreamExecuteScanQuery(queryFixed).GetValueSync(); - UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); - TString result = StreamResultToYson(it); - if (!i.GetExpectedReply().empty()) { - CompareYson(result, i.GetExpectedReply()); - } + if (!genericQuery) { + auto tableClient = kikimr.GetTableClient(); + for (auto&& i : cases) { + RunTestCaseWithClient(i, tableClient); + CheckPlanForAggregatePushdown(i.GetFixedQuery(), tableClient, + i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType()); + } + } else { + auto queryClient = kikimr.GetQueryClient(); + for (auto&& i : cases) { + RunTestCaseWithClient(i, queryClient); + CheckPlanForAggregatePushdown(i.GetFixedQuery(), queryClient, + i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType()); } - CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType()); } } @@ -5883,6 +5929,53 @@ Y_UNIT_TEST_SUITE(KqpOlap) { CompareYson(output, R"([])"); } } + + Y_UNIT_TEST(BlockGenericWithDistinct) { + TAggregationTestCase testCase; + testCase.SetQuery(R"( + SELECT + COUNT(DISTINCT id) + FROM `/Root/tableWithNulls` + WHERE level = 5 AND Cast(id AS String) = "5"; + )") + .AddExpectedPlanOptions("KqpBlockReadOlapTableRanges") + .AddExpectedPlanOptions("WideFromBlocks") + .SetExpectedReply("[[1u]]"); + TestTableWithNulls({ testCase }, /* generic */ true); + } + + Y_UNIT_TEST(BlockGenericSimpleAggregation) { + TAggregationTestCase testCase; + testCase.SetQuery(R"( + SELECT + level, COUNT(*), SUM(id) + FROM `/Root/tableWithNulls` + WHERE level = 5 + GROUP BY level + ORDER BY level; + )") + .AddExpectedPlanOptions("KqpBlockReadOlapTableRanges") + .AddExpectedPlanOptions("WideFromBlocks") + .SetExpectedReply(R"([[[5];1u;5]])"); + + TestTableWithNulls({ testCase }, /* generic */ true); + } + + Y_UNIT_TEST(BlockGenericSelectAll) { + TAggregationTestCase testCase; + testCase.SetQuery(R"( + SELECT + id, resource_id, level + FROM `/Root/tableWithNulls` + WHERE level != 5 OR level IS NULL + ORDER BY id, resource_id, level; + )") + .AddExpectedPlanOptions("KqpBlockReadOlapTableRanges") + .AddExpectedPlanOptions("WideFromBlocks") + .SetExpectedReply(R"([[1;#;[1]];[2;#;[2]];[3;#;[3]];[4;#;[4]];[6;["6"];#];[7;["7"];#];[8;["8"];#];[9;["9"];#];[10;["10"];#]])"); + + TestTableWithNulls({ testCase }, /* generic */ true); + } } } // namespace NKqp |