diff options
author | aidarsamer <aidarsamer@ydb.tech> | 2022-10-13 13:08:52 +0300 |
---|---|---|
committer | aidarsamer <aidarsamer@ydb.tech> | 2022-10-13 13:08:52 +0300 |
commit | 7d2fae9eb08cb6452e33826ebd9cd19c4d1ea776 (patch) | |
tree | 01875c5200b4eb0eaebd48cff614052305ddde78 | |
parent | 9cc5d1d5dbe564b03d97a801aa7ad883628ee531 (diff) | |
download | ydb-7d2fae9eb08cb6452e33826ebd9cd19c4d1ea776.tar.gz |
Fix limit and ExtractMembers pushdown to column shards.
Fix limit and ExtractMemers pushdown to column shard
-rw-r--r-- | ydb/core/kqp/compile/kqp_olap_compiler.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/expr_nodes/kqp_expr_nodes.json | 8 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp | 56 | ||||
-rw-r--r-- | ydb/core/kqp/prepare/kqp_type_ann.cpp | 41 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_olap_ut.cpp | 23 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__stats_scan.h | 5 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_core.cpp | 22 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_impl.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_list.cpp | 8 |
9 files changed, 140 insertions, 39 deletions
diff --git a/ydb/core/kqp/compile/kqp_olap_compiler.cpp b/ydb/core/kqp/compile/kqp_olap_compiler.cpp index 351110d4767..dcac5ac61c2 100644 --- a/ydb/core/kqp/compile/kqp_olap_compiler.cpp +++ b/ydb/core/kqp/compile/kqp_olap_compiler.cpp @@ -407,6 +407,17 @@ void CompileAggregates(const TKqpOlapAgg& aggNode, TKqpOlapCompileContext& ctx) } } +void CompileProjection(const TKqpOlapExtractMembers& extractMembers, TKqpOlapCompileContext& ctx) { + auto* projection = ctx.CreateProjection(); + for (auto col : extractMembers.Members()) { + auto colName = col.StringValue(); + auto colId = GetOrCreateColumnId(col, ctx); + + auto* projCol = projection->AddColumns(); + projCol->SetId(colId); + } +} + void CompileOlapProgramImpl(TExprBase operation, TKqpOlapCompileContext& ctx) { if (operation.Raw() == ctx.GetRowExpr()) { return; @@ -420,6 +431,9 @@ void CompileOlapProgramImpl(TExprBase operation, TKqpOlapCompileContext& ctx) { } else if (auto maybeAgg = operation.Maybe<TKqpOlapAgg>()) { CompileAggregates(maybeAgg.Cast(), ctx); return; + } else if (auto maybeExtractMembers = operation.Maybe<TKqpOlapExtractMembers>()) { + CompileProjection(maybeExtractMembers.Cast(), ctx); + return; } } diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 6c3ebee989e..9b5d293456e 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -465,6 +465,14 @@ ] }, { + "Name": "TKqpOlapExtractMembers", + "Base": "TKqpOlapOperationBase", + "Match": {"Type": "Callable", "Name": "TKqpOlapExtractMembers"}, + "Children": [ + {"Index": 1, "Name": "Members", "Type": "TCoAtomList"} + ] + }, + { "Name": "TKqpEnsure", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "KqpEnsure"}, diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp index 1234ee976ed..ba6f77981af 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp @@ -131,27 +131,51 @@ TExprBase KqpApplyExtractMembersToReadOlapTable(TExprBase node, TExprContext& ct auto read = node.Cast<TKqpReadOlapTableRangesBase>(); - // When process is set it may use columns in read.Columns() but those columns may not be present - // in the results. Thus do not apply extract members if process is not empty lambda - // TODO: Support process lambda in this rule. - if (read.Process().Body().Raw() != read.Process().Args().Arg(0).Raw()) { - return node; - } - auto usedColumns = GetUsedColumns(read, read.Columns(), parentsMap, allowMultiUsage, ctx); if (!usedColumns) { return node; } - return Build<TKqpReadOlapTableRangesBase>(ctx, read.Pos()) - .CallableName(read.CallableName()) - .Table(read.Table()) - .Ranges(read.Ranges()) - .Columns(usedColumns.Cast()) - .Settings(read.Settings()) - .ExplainPrompt(read.ExplainPrompt()) - .Process(read.Process()) - .Done(); + // When process is set it may use columns in read.Columns() but those columns may not be present + // in the results. Thus do not apply extract members if process is not empty lambda + if (read.Process().Body().Raw() != read.Process().Args().Arg(0).Raw()) { + auto extractMembers = Build<TKqpOlapExtractMembers>(ctx, node.Pos()) + .Input(read.Process().Args().Arg(0)) + .Members(usedColumns.Cast()) + .Done(); + + auto extractMembersLambda = Build<TCoLambda>(ctx, node.Pos()) + .Args({"row"}) + .Body<TExprApplier>() + .Apply(extractMembers) + .With(read.Process().Args().Arg(0), "row") + .Build() + .Done(); + + auto newProcessLambda = ctx.FuseLambdas(extractMembersLambda.Ref(), read.Process().Ref()); + + YQL_CLOG(INFO, ProviderKqp) << "Pushed ExtractMembers lambda: " << KqpExprToPrettyString(*newProcessLambda, ctx); + + return Build<TKqpReadOlapTableRangesBase>(ctx, read.Pos()) + .CallableName(read.CallableName()) + .Table(read.Table()) + .Ranges(read.Ranges()) + .Columns(read.Columns()) + .Settings(read.Settings()) + .ExplainPrompt(read.ExplainPrompt()) + .Process(newProcessLambda) + .Done(); + } else { + return Build<TKqpReadOlapTableRangesBase>(ctx, read.Pos()) + .CallableName(read.CallableName()) + .Table(read.Table()) + .Ranges(read.Ranges()) + .Columns(usedColumns.Cast()) + .Settings(read.Settings()) + .ExplainPrompt(read.ExplainPrompt()) + .Process(read.Process()) + .Done(); + } } TExprBase KqpApplyExtractMembersToLookupTable(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap, diff --git a/ydb/core/kqp/prepare/kqp_type_ann.cpp b/ydb/core/kqp/prepare/kqp_type_ann.cpp index bfa6d038d4a..80afc03b442 100644 --- a/ydb/core/kqp/prepare/kqp_type_ann.cpp +++ b/ydb/core/kqp/prepare/kqp_type_ann.cpp @@ -4,6 +4,7 @@ #include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> #include <ydb/library/yql/core/type_ann/type_ann_core.h> +#include "ydb/library/yql/core/type_ann/type_ann_impl.h" #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/utils/log/log.h> @@ -920,6 +921,42 @@ TStatus AnnotateOlapAgg(const TExprNode::TPtr& node, TExprContext& ctx) { return TStatus::Ok; } + +TStatus AnnotateOlapExtractMembers(const TExprNode::TPtr& node, TExprContext& ctx) { + if (!EnsureArgsCount(*node, 2, ctx)) { + return IGraphTransformer::TStatus::Error; + } + + const TTypeAnnotationNode* nodeItemType = nullptr; + if (!EnsureNewSeqType<true>(node->Head(), ctx, &nodeItemType)) { + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureStructType(node->Head().Pos(), *nodeItemType, ctx)) { + return IGraphTransformer::TStatus::Error; + } + + const auto structType = nodeItemType->Cast<TStructExprType>(); + TVector<const TItemExprType*> resItems; + for (auto& x : node->Tail().Children()) { + YQL_ENSURE(x->IsAtom()); + auto pos = NYql::NTypeAnnImpl::FindOrReportMissingMember(x->Content(), node->Head().Pos(), *structType, ctx); + if (!pos) { + return IGraphTransformer::TStatus::Error; + } + + resItems.push_back(structType->GetItems()[*pos]); + } + + const auto resItemType = ctx.MakeType<TStructExprType>(resItems); + if (!resItemType->Validate(node->Pos(), ctx)) { + return IGraphTransformer::TStatus::Error; + } + + node->SetTypeAnn(MakeSequenceType(node->Head().GetTypeAnn()->GetKind(), *resItemType, ctx)); + return IGraphTransformer::TStatus::Ok; +} + TStatus AnnotateKqpTxInternalBinding(const TExprNode::TPtr& node, TExprContext& ctx) { if (!EnsureArgsCount(*node, 2, ctx)) { return TStatus::Error; @@ -1237,6 +1274,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl return AnnotateOlapAgg(input, ctx); } + if (TKqpOlapExtractMembers::Match(input.Get())) { + return AnnotateOlapExtractMembers(input, ctx); + } + if (TKqpCnMapShard::Match(input.Get()) || TKqpCnShuffleShard::Match(input.Get())) { return AnnotateDqConnection(input, ctx); } diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp index 1c290d31b4a..79d26ffb590 100644 --- a/ydb/core/kqp/ut/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp @@ -1751,7 +1751,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(Aggregation_ResultCountAll_FilterL) { - //https://st.yandex-team.ru/KIKIMR-15900 + //https://st.yandex-team.ru/KIKIMR-16073 return; TAggregationTestCase testCase; testCase.SetQuery(R"( @@ -1834,8 +1834,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(Aggregation_ResultT_FilterL_Limit2) { - //https://st.yandex-team.ru/KIKIMR-15900 - return; TAggregationTestCase testCase; testCase.SetQuery(R"( SELECT @@ -1845,14 +1843,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) { LIMIT 2 )") .AddExpectedPlanOptions("KqpOlapFilter") + .AddExpectedPlanOptions("KqpOlapExtractMembers") .MutableLimitChecker().SetExpectedLimit(2); TestAggregations({ testCase }); } Y_UNIT_TEST(Aggregation_ResultT_FilterL_OrderT_Limit2) { - //https://st.yandex-team.ru/KIKIMR-15900 - return; TAggregationTestCase testCase; testCase.SetQuery(R"( SELECT @@ -1863,11 +1860,27 @@ Y_UNIT_TEST_SUITE(KqpOlap) { LIMIT 2 )") .AddExpectedPlanOptions("KqpOlapFilter") + .AddExpectedPlanOptions("KqpOlapExtractMembers") .MutableLimitChecker().SetExpectedLimit(2); TestAggregations({ testCase }); } + Y_UNIT_TEST(Aggregation_ResultL_FilterL_OrderL_Limit2) { + TAggregationTestCase testCase; + testCase.SetQuery(R"( + SELECT + timestamp, level + FROM `/Root/olapStore/olapTable` + WHERE level > 1 + ORDER BY level + LIMIT 2 + )") + .AddExpectedPlanOptions("KqpOlapFilter"); + + TestAggregations({ testCase }); + } + Y_UNIT_TEST(AggregationAndFilterPushdownOnDiffCols) { auto settings = TKikimrSettings() .SetWithSampleTables(false) diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h index cc7fba4b373..ca5354b8d1d 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.h +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h @@ -53,13 +53,14 @@ public: ApplyRangePredicates(batch); + // Leave only requested columns + auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema); + if (!ReadMetadata->Program.empty()) { auto status = ApplyProgram(batch, ReadMetadata->Program, NArrow::GetCustomExecContext()); Y_VERIFY_S(status.ok(), status.message()); } - // Leave only requested columns - auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema); NOlap::TPartialReadResult out{std::move(resultBatch), std::move(lastKey)}; return out; diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 9ab90ef4368..67db76b1b22 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -869,11 +869,11 @@ namespace NTypeAnnImpl { return result; } - TMaybe<ui32> FindOrReportMissingMember(TStringBuf memberName, TPositionHandle pos, const TStructExprType& structType, TContext& ctx) { + TMaybe<ui32> FindOrReportMissingMember(TStringBuf memberName, TPositionHandle pos, const TStructExprType& structType, TExprContext& ctx) { TString errStr; auto result = FindOrReportMissingMember(memberName, structType, errStr); if (!result) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(pos), errStr)); + ctx.AddError(TIssue(ctx.GetPosition(pos), errStr)); } return result; } @@ -913,7 +913,7 @@ namespace NTypeAnnImpl { } auto memberName = input->Tail().Content(); - auto pos = FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx); + auto pos = FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx.Expr); if (!pos) { return IGraphTransformer::TStatus::Error; } @@ -1095,7 +1095,7 @@ namespace NTypeAnnImpl { bool isOptional = input->Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional; const TStructExprType& structType = *RemoveAllOptionals(input->Head().GetTypeAnn())->Cast<TStructExprType>(); - auto pos = FindOrReportMissingMember(columnNameNode->Content(), input->Pos(), structType, ctx); + auto pos = FindOrReportMissingMember(columnNameNode->Content(), input->Pos(), structType, ctx.Expr); if (!pos) { return IGraphTransformer::TStatus::Error; } @@ -1706,7 +1706,7 @@ namespace NTypeAnnImpl { TVector<const TItemExprType*> newItems = structType->GetItems(); EraseIf(newItems, [&](const auto& item) { return item->GetName() == memberName; }); - if (!Forced && !FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx)) { + if (!Forced && !FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } @@ -1744,7 +1744,7 @@ namespace NTypeAnnImpl { auto memberName = child->Content(); EraseIf(newItems, [&](const auto& item) { return item->GetName() == memberName; }); - if (!Forced && !FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx)) { + if (!Forced && !FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } } @@ -1927,7 +1927,7 @@ namespace NTypeAnnImpl { auto memberName = input->Child(1)->Content(); auto structType = input->Head().GetTypeAnn()->Cast<TStructExprType>(); - auto pos = FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx); + auto pos = FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx.Expr); if (!pos) { return IGraphTransformer::TStatus::Error; } @@ -5032,7 +5032,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> const TTypeAnnotationNode* itemType; if (varType->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Struct) { auto structType = varType->GetUnderlyingType()->Cast<TStructExprType>(); - auto pos = FindOrReportMissingMember(input->Child(1)->Content(), input->Pos(), *structType, ctx); + auto pos = FindOrReportMissingMember(input->Child(1)->Content(), input->Pos(), *structType, ctx.Expr); if (!pos) { return IGraphTransformer::TStatus::Error; } @@ -7904,7 +7904,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> input->SetTypeAnn(tupleType->GetItems()[index]); } else { auto structType = variantType->GetUnderlyingType()->Cast<TStructExprType>(); - auto pos = FindOrReportMissingMember(input->Child(1)->Content(), input->Pos(), *structType, ctx); + auto pos = FindOrReportMissingMember(input->Child(1)->Content(), input->Pos(), *structType, ctx.Expr); if (!pos) { return IGraphTransformer::TStatus::Error; } @@ -8023,7 +8023,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> itemType = tupleType->GetItems()[index]; itemIndex = index; } else { - auto pos = FindOrReportMissingMember(child->Content(), child->Pos(), *structType, ctx); + auto pos = FindOrReportMissingMember(child->Content(), child->Pos(), *structType, ctx.Expr); if (!pos) { return IGraphTransformer::TStatus::Error; } @@ -11030,7 +11030,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> return IGraphTransformer::TStatus::Error; } - auto pos = FindOrReportMissingMember(key, keyNode->Pos(), structType, ctx); + auto pos = FindOrReportMissingMember(key, keyNode->Pos(), structType, ctx.Expr); if (!pos) { return IGraphTransformer::TStatus::Error; } diff --git a/ydb/library/yql/core/type_ann/type_ann_impl.h b/ydb/library/yql/core/type_ann/type_ann_impl.h index 31677f60ade..48e8ffd60a0 100644 --- a/ydb/library/yql/core/type_ann/type_ann_impl.h +++ b/ydb/library/yql/core/type_ann/type_ann_impl.h @@ -34,7 +34,7 @@ namespace NTypeAnnImpl { IGraphTransformer::TStatus CombineCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus GroupingCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); - TMaybe<ui32> FindOrReportMissingMember(TStringBuf memberName, TPositionHandle pos, const TStructExprType& structType, TContext& ctx); + TMaybe<ui32> FindOrReportMissingMember(TStringBuf memberName, TPositionHandle pos, const TStructExprType& structType, TExprContext& ctx); TExprNode::TPtr MakeNothingData(TExprContext& ctx, TPositionHandle pos, TStringBuf data); } // namespace NTypeAnnImpl diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp index 1f8bf18c6d9..0c267c4f134 100644 --- a/ydb/library/yql/core/type_ann/type_ann_list.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp @@ -4213,7 +4213,7 @@ namespace { TVector<const TItemExprType*> resItems; for (auto& x : input->Tail().Children()) { YQL_ENSURE(x->IsAtom()); - auto pos = FindOrReportMissingMember(x->Content(), input->Head().Pos(), *structType, ctx); + auto pos = FindOrReportMissingMember(x->Content(), input->Head().Pos(), *structType, ctx.Expr); if (!pos) { return IGraphTransformer::TStatus::Error; } @@ -4252,7 +4252,7 @@ namespace { const auto structType = inputItemType->Cast<TStructExprType>(); for (auto& x : input->Tail().Children()) { YQL_ENSURE(x->IsAtom()); - auto pos = FindOrReportMissingMember(x->Content(), input->Head().Pos(), *structType, ctx); + auto pos = FindOrReportMissingMember(x->Content(), input->Head().Pos(), *structType, ctx.Expr); if (!pos) { return IGraphTransformer::TStatus::Error; } @@ -4306,7 +4306,7 @@ namespace { for (auto& column : columnOrder) { YQL_ENSURE(column->IsAtom()); - auto pos = FindOrReportMissingMember(column->Content(), input->Head().Pos(), *structType, ctx); + auto pos = FindOrReportMissingMember(column->Content(), input->Head().Pos(), *structType, ctx.Expr); if (!pos) { return IGraphTransformer::TStatus::Error; } @@ -4762,7 +4762,7 @@ namespace { continue; } - auto item = FindOrReportMissingMember(child->Content(), child->Pos(), *inputStructType, ctx); + auto item = FindOrReportMissingMember(child->Content(), child->Pos(), *inputStructType, ctx.Expr); if (!item) { return IGraphTransformer::TStatus::Error; } |