aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2024-01-30 15:28:17 +0300
committerGitHub <noreply@github.com>2024-01-30 15:28:17 +0300
commit7c628c7b86cecdd277cb87453e3c733e4bc15943 (patch)
treea978d89649d1ab451cb877fcf140d97adf87e007
parent658eb362cb3806ee45cbdb944efb6719651af4b3 (diff)
downloadydb-7c628c7b86cecdd277cb87453e3c733e4bc15943.tar.gz
Blocks for first stages (#1404)
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp22
-rw-r--r--ydb/core/kqp/host/kqp_runner.cpp11
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp11
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp153
4 files changed, 165 insertions, 32 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index c5860551ee..88c3b86e56 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -176,6 +176,25 @@ bool NeedReportStats(const Ydb::Query::ExecuteQueryRequest& req) {
}
}
+bool NeedReportAst(const Ydb::Query::ExecuteQueryRequest& req) {
+ switch (req.exec_mode()) {
+ case Ydb::Query::EXEC_MODE_EXPLAIN:
+ return true;
+
+ case Ydb::Query::EXEC_MODE_EXECUTE:
+ switch (req.stats_mode()) {
+ case Ydb::Query::StatsMode::STATS_MODE_FULL:
+ case Ydb::Query::StatsMode::STATS_MODE_PROFILE:
+ return true;
+ default:
+ return false;
+ }
+
+ default:
+ return false;
+ }
+}
+
class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -382,6 +401,9 @@ private:
if (NeedReportStats(*Request_->GetProtoRequest())) {
hasTrailingMessage = true;
FillQueryStats(*response.mutable_exec_stats(), kqpResponse);
+ if (NeedReportAst(*Request_->GetProtoRequest())) {
+ response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
+ }
}
if (hasTrailingMessage) {
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp
index 2925bd58a3..b73e6ebe86 100644
--- a/ydb/core/kqp/host/kqp_runner.cpp
+++ b/ydb/core/kqp/host/kqp_runner.cpp
@@ -138,6 +138,8 @@ public:
: Gateway(gateway)
, Cluster(cluster)
, TypesCtx(*typesCtx)
+ , SessionCtx(sessionCtx)
+ , FunctionRegistry(funcRegistry)
, Config(sessionCtx->ConfigPtr())
, TransformCtx(MakeIntrusive<TKqlTransformContext>(Config, sessionCtx->QueryPtr(), sessionCtx->TablesPtr()))
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
@@ -192,14 +194,19 @@ public:
YQL_ENSURE(IsIn({EKikimrQueryType::Query, EKikimrQueryType::Script}, TransformCtx->QueryCtx->Type));
YQL_ENSURE(TMaybeNode<TKiDataQueryBlocks>(query));
+ TypesCtx.BlockEngineMode = NYql::EBlockEngineMode::Auto;
+
return PrepareQueryInternal(cluster, TKiDataQueryBlocks(query), ctx, settings);
}
private:
+
TIntrusivePtr<TAsyncQueryResult> PrepareQueryInternal(const TString& cluster,
const TKiDataQueryBlocks& dataQueryBlocks, TExprContext& ctx,
const IKikimrQueryExecutor::TExecuteSettings& settings)
{
+ CreateGraphTransformer(&TypesCtx, SessionCtx, FunctionRegistry);
+
YQL_ENSURE(cluster == Cluster);
YQL_ENSURE(!settings.CommitTx);
YQL_ENSURE(!settings.RollbackTx);
@@ -311,7 +318,7 @@ private:
TAutoPtr<IGraphTransformer> compilePhysicalQuery(new TCompilePhysicalQueryTransformer(Cluster,
*TransformCtx,
*OptimizeCtx,
- TypesCtx,
+ *typesCtx,
funcRegistry,
Config));
@@ -349,6 +356,8 @@ private:
TIntrusivePtr<IKqpGateway> Gateway;
TString Cluster;
TTypeAnnotationContext& TypesCtx;
+ TIntrusivePtr<TKikimrSessionContext> SessionCtx;
+ const NMiniKQL::IFunctionRegistry& FunctionRegistry;
TKikimrConfiguration::TPtr Config;
TIntrusivePtr<TKqlTransformContext> TransformCtx;
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 28022e3576..adea3a31a0 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -908,7 +908,16 @@ static void FillPlan(const NYdb::NScripting::TYqlResultPart& streamPart, TCollec
}
}
-static void FillPlan(const NYdb::NQuery::TExecuteQueryPart& /*streamPart*/, TCollectedStreamResult& /*res*/) {}
+static void FillPlan(const NYdb::NQuery::TExecuteQueryPart& streamPart, TCollectedStreamResult& res) {
+ if (streamPart.GetStats() ) {
+ res.QueryStats = NYdb::TProtoAccessor::GetProto(*streamPart.GetStats());
+
+ auto plan = res.QueryStats->query_plan();
+ if (!plan.empty()) {
+ res.PlanJson = plan;
+ }
+ }
+}
template<typename TIterator>
TCollectedStreamResult CollectStreamResultImpl(TIterator& it) {
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 9f9a4701b6..d8d10c18fb 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -690,12 +690,27 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
};
}
- void CheckPlanForAggregatePushdown(const TString& query, NYdb::NTable::TTableClient& tableClient, const std::vector<std::string>& planNodes,
+ template <typename TClient>
+ auto StreamExplainQuery(const TString& query, TClient& client) {
+ if constexpr (std::is_same_v<NYdb::NTable::TTableClient, TClient>) {
+ TStreamExecScanQuerySettings scanSettings;
+ scanSettings.Explain(true);
+ return client.StreamExecuteScanQuery(query, scanSettings).GetValueSync();
+ } else {
+ NYdb::NQuery::TExecuteQuerySettings scanSettings;
+ scanSettings.ExecMode(NYdb::NQuery::EExecMode::Explain);
+ return client.StreamExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), scanSettings).GetValueSync();
+ }
+ }
+
+ template <typename TClient>
+ void CheckPlanForAggregatePushdown(
+ const TString& query,
+ TClient& client,
+ const std::vector<std::string>& expectedPlanNodes,
const std::string& readNodeType)
{
- TStreamExecScanQuerySettings scanSettings;
- scanSettings.Explain(true);
- auto res = tableClient.StreamExecuteScanQuery(query, scanSettings).GetValueSync();
+ auto res = StreamExplainQuery(query, client);
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
auto planRes = CollectStreamResult(res);
@@ -704,7 +719,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Cerr << planRes.PlanJson.GetOrElse("NO_PLAN") << Endl;
Cerr << "AST:" << Endl;
Cerr << ast << Endl;
- for (auto planNode : planNodes) {
+ for (auto planNode : expectedPlanNodes) {
UNIT_ASSERT_C(ast.find(planNode) != std::string::npos,
TStringBuilder() << planNode << " was not found. Query: " << query);
}
@@ -2437,10 +2452,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
ExpectedReply = value;
return *this;
}
+
TAggregationTestCase& AddExpectedPlanOptions(const std::string& value) {
ExpectedPlanOptions.emplace_back(value);
return *this;
}
+
const std::vector<std::string>& GetExpectedPlanOptions() const {
return ExpectedPlanOptions;
}
@@ -2548,7 +2565,28 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TestAggregationsInternal(cases);
}
- void TestClickBenchBase(const std::vector<TAggregationTestCase>& cases) {
+ template <typename TClient>
+ auto StreamExecuteQuery(const TAggregationTestCase& testCase, TClient& client) {
+ if constexpr (std::is_same_v<NYdb::NTable::TTableClient, TClient>) {
+ return client.StreamExecuteScanQuery(testCase.GetFixedQuery()).GetValueSync();
+ } else {
+ return client.StreamExecuteQuery(
+ testCase.GetFixedQuery(),
+ NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ }
+ }
+
+ template <typename TClient>
+ void RunTestCaseWithClient(const TAggregationTestCase& testCase, TClient& client) {
+ auto it = StreamExecuteQuery(testCase, client);
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ if (!testCase.GetExpectedReply().empty()) {
+ CompareYson(result, testCase.GetExpectedReply());
+ }
+ }
+
+ void TestClickBenchBase(const std::vector<TAggregationTestCase>& cases, const bool genericQuery) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
.SetForceColumnTablesCompositeMarks(true);
@@ -2564,17 +2602,20 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WriteTestDataForClickBench(kikimr, "/Root/benchTable", 0, 1000000 + i * 1000000, iterationPackSize);
}
- for (auto&& i : cases) {
- const TString queryFixed = i.GetFixedQuery();
- {
- auto it = tableClient.StreamExecuteScanQuery(queryFixed).GetValueSync();
- UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
- TString result = StreamResultToYson(it);
- if (!i.GetExpectedReply().empty()) {
- CompareYson(result, i.GetExpectedReply());
- }
+ if (!genericQuery) {
+ auto tableClient = kikimr.GetTableClient();
+ for (auto&& i : cases) {
+ const TString queryFixed = i.GetFixedQuery();
+ RunTestCaseWithClient(i, tableClient);
+ CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
+ }
+ } else {
+ auto queryClient = kikimr.GetQueryClient();
+ for (auto&& i : cases) {
+ const TString queryFixed = i.GetFixedQuery();
+ RunTestCaseWithClient(i, queryClient);
+ CheckPlanForAggregatePushdown(queryFixed, queryClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
}
- CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
}
}
@@ -2637,12 +2678,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}
- void TestClickBench(const std::vector<TAggregationTestCase>& cases) {
- TestClickBenchBase(cases);
- TestClickBenchInternal(cases);
+ void TestClickBench(const std::vector<TAggregationTestCase>& cases, const bool genericQuery = false) {
+ TestClickBenchBase(cases, genericQuery);
+ if (!genericQuery) {
+ TestClickBenchInternal(cases);
+ }
}
- void TestTableWithNulls(const std::vector<TAggregationTestCase>& cases) {
+ void TestTableWithNulls(const std::vector<TAggregationTestCase>& cases, const bool genericQuery = false) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
.SetForceColumnTablesCompositeMarks(true);
@@ -2656,17 +2699,20 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WriteTestDataForTableWithNulls(kikimr, "/Root/tableWithNulls");
}
- for (auto&& i : cases) {
- const TString queryFixed = i.GetFixedQuery();
- {
- auto it = tableClient.StreamExecuteScanQuery(queryFixed).GetValueSync();
- UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
- TString result = StreamResultToYson(it);
- if (!i.GetExpectedReply().empty()) {
- CompareYson(result, i.GetExpectedReply());
- }
+ if (!genericQuery) {
+ auto tableClient = kikimr.GetTableClient();
+ for (auto&& i : cases) {
+ RunTestCaseWithClient(i, tableClient);
+ CheckPlanForAggregatePushdown(i.GetFixedQuery(), tableClient,
+ i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
+ }
+ } else {
+ auto queryClient = kikimr.GetQueryClient();
+ for (auto&& i : cases) {
+ RunTestCaseWithClient(i, queryClient);
+ CheckPlanForAggregatePushdown(i.GetFixedQuery(), queryClient,
+ i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
}
- CheckPlanForAggregatePushdown(queryFixed, tableClient, i.GetExpectedPlanOptions(), i.GetExpectedReadNodeType());
}
}
@@ -5883,6 +5929,53 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(output, R"([])");
}
}
+
+ Y_UNIT_TEST(BlockGenericWithDistinct) {
+ TAggregationTestCase testCase;
+ testCase.SetQuery(R"(
+ SELECT
+ COUNT(DISTINCT id)
+ FROM `/Root/tableWithNulls`
+ WHERE level = 5 AND Cast(id AS String) = "5";
+ )")
+ .AddExpectedPlanOptions("KqpBlockReadOlapTableRanges")
+ .AddExpectedPlanOptions("WideFromBlocks")
+ .SetExpectedReply("[[1u]]");
+ TestTableWithNulls({ testCase }, /* generic */ true);
+ }
+
+ Y_UNIT_TEST(BlockGenericSimpleAggregation) {
+ TAggregationTestCase testCase;
+ testCase.SetQuery(R"(
+ SELECT
+ level, COUNT(*), SUM(id)
+ FROM `/Root/tableWithNulls`
+ WHERE level = 5
+ GROUP BY level
+ ORDER BY level;
+ )")
+ .AddExpectedPlanOptions("KqpBlockReadOlapTableRanges")
+ .AddExpectedPlanOptions("WideFromBlocks")
+ .SetExpectedReply(R"([[[5];1u;5]])");
+
+ TestTableWithNulls({ testCase }, /* generic */ true);
+ }
+
+ Y_UNIT_TEST(BlockGenericSelectAll) {
+ TAggregationTestCase testCase;
+ testCase.SetQuery(R"(
+ SELECT
+ id, resource_id, level
+ FROM `/Root/tableWithNulls`
+ WHERE level != 5 OR level IS NULL
+ ORDER BY id, resource_id, level;
+ )")
+ .AddExpectedPlanOptions("KqpBlockReadOlapTableRanges")
+ .AddExpectedPlanOptions("WideFromBlocks")
+ .SetExpectedReply(R"([[1;#;[1]];[2;#;[2]];[3;#;[3]];[4;#;[4]];[6;["6"];#];[7;["7"];#];[8;["8"];#];[9;["9"];#];[10;["10"];#]])");
+
+ TestTableWithNulls({ testCase }, /* generic */ true);
+ }
}
} // namespace NKqp