aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <aidarsamer@ydb.tech>2022-10-13 13:08:52 +0300
committeraidarsamer <aidarsamer@ydb.tech>2022-10-13 13:08:52 +0300
commit7d2fae9eb08cb6452e33826ebd9cd19c4d1ea776 (patch)
tree01875c5200b4eb0eaebd48cff614052305ddde78
parent9cc5d1d5dbe564b03d97a801aa7ad883628ee531 (diff)
downloadydb-7d2fae9eb08cb6452e33826ebd9cd19c4d1ea776.tar.gz
Fix limit and ExtractMembers pushdown to column shards.
Fix limit and ExtractMemers pushdown to column shard
-rw-r--r--ydb/core/kqp/compile/kqp_olap_compiler.cpp14
-rw-r--r--ydb/core/kqp/expr_nodes/kqp_expr_nodes.json8
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp56
-rw-r--r--ydb/core/kqp/prepare/kqp_type_ann.cpp41
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp23
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.h5
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp22
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_impl.h2
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_list.cpp8
9 files changed, 140 insertions, 39 deletions
diff --git a/ydb/core/kqp/compile/kqp_olap_compiler.cpp b/ydb/core/kqp/compile/kqp_olap_compiler.cpp
index 351110d4767..dcac5ac61c2 100644
--- a/ydb/core/kqp/compile/kqp_olap_compiler.cpp
+++ b/ydb/core/kqp/compile/kqp_olap_compiler.cpp
@@ -407,6 +407,17 @@ void CompileAggregates(const TKqpOlapAgg& aggNode, TKqpOlapCompileContext& ctx)
}
}
+void CompileProjection(const TKqpOlapExtractMembers& extractMembers, TKqpOlapCompileContext& ctx) {
+ auto* projection = ctx.CreateProjection();
+ for (auto col : extractMembers.Members()) {
+ auto colName = col.StringValue();
+ auto colId = GetOrCreateColumnId(col, ctx);
+
+ auto* projCol = projection->AddColumns();
+ projCol->SetId(colId);
+ }
+}
+
void CompileOlapProgramImpl(TExprBase operation, TKqpOlapCompileContext& ctx) {
if (operation.Raw() == ctx.GetRowExpr()) {
return;
@@ -420,6 +431,9 @@ void CompileOlapProgramImpl(TExprBase operation, TKqpOlapCompileContext& ctx) {
} else if (auto maybeAgg = operation.Maybe<TKqpOlapAgg>()) {
CompileAggregates(maybeAgg.Cast(), ctx);
return;
+ } else if (auto maybeExtractMembers = operation.Maybe<TKqpOlapExtractMembers>()) {
+ CompileProjection(maybeExtractMembers.Cast(), ctx);
+ return;
}
}
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
index 6c3ebee989e..9b5d293456e 100644
--- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
+++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
@@ -465,6 +465,14 @@
]
},
{
+ "Name": "TKqpOlapExtractMembers",
+ "Base": "TKqpOlapOperationBase",
+ "Match": {"Type": "Callable", "Name": "TKqpOlapExtractMembers"},
+ "Children": [
+ {"Index": 1, "Name": "Members", "Type": "TCoAtomList"}
+ ]
+ },
+ {
"Name": "TKqpEnsure",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "KqpEnsure"},
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
index 1234ee976ed..ba6f77981af 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
@@ -131,27 +131,51 @@ TExprBase KqpApplyExtractMembersToReadOlapTable(TExprBase node, TExprContext& ct
auto read = node.Cast<TKqpReadOlapTableRangesBase>();
- // When process is set it may use columns in read.Columns() but those columns may not be present
- // in the results. Thus do not apply extract members if process is not empty lambda
- // TODO: Support process lambda in this rule.
- if (read.Process().Body().Raw() != read.Process().Args().Arg(0).Raw()) {
- return node;
- }
-
auto usedColumns = GetUsedColumns(read, read.Columns(), parentsMap, allowMultiUsage, ctx);
if (!usedColumns) {
return node;
}
- return Build<TKqpReadOlapTableRangesBase>(ctx, read.Pos())
- .CallableName(read.CallableName())
- .Table(read.Table())
- .Ranges(read.Ranges())
- .Columns(usedColumns.Cast())
- .Settings(read.Settings())
- .ExplainPrompt(read.ExplainPrompt())
- .Process(read.Process())
- .Done();
+ // When process is set it may use columns in read.Columns() but those columns may not be present
+ // in the results. Thus do not apply extract members if process is not empty lambda
+ if (read.Process().Body().Raw() != read.Process().Args().Arg(0).Raw()) {
+ auto extractMembers = Build<TKqpOlapExtractMembers>(ctx, node.Pos())
+ .Input(read.Process().Args().Arg(0))
+ .Members(usedColumns.Cast())
+ .Done();
+
+ auto extractMembersLambda = Build<TCoLambda>(ctx, node.Pos())
+ .Args({"row"})
+ .Body<TExprApplier>()
+ .Apply(extractMembers)
+ .With(read.Process().Args().Arg(0), "row")
+ .Build()
+ .Done();
+
+ auto newProcessLambda = ctx.FuseLambdas(extractMembersLambda.Ref(), read.Process().Ref());
+
+ YQL_CLOG(INFO, ProviderKqp) << "Pushed ExtractMembers lambda: " << KqpExprToPrettyString(*newProcessLambda, ctx);
+
+ return Build<TKqpReadOlapTableRangesBase>(ctx, read.Pos())
+ .CallableName(read.CallableName())
+ .Table(read.Table())
+ .Ranges(read.Ranges())
+ .Columns(read.Columns())
+ .Settings(read.Settings())
+ .ExplainPrompt(read.ExplainPrompt())
+ .Process(newProcessLambda)
+ .Done();
+ } else {
+ return Build<TKqpReadOlapTableRangesBase>(ctx, read.Pos())
+ .CallableName(read.CallableName())
+ .Table(read.Table())
+ .Ranges(read.Ranges())
+ .Columns(usedColumns.Cast())
+ .Settings(read.Settings())
+ .ExplainPrompt(read.ExplainPrompt())
+ .Process(read.Process())
+ .Done();
+ }
}
TExprBase KqpApplyExtractMembersToLookupTable(TExprBase node, TExprContext& ctx, const TParentsMap& parentsMap,
diff --git a/ydb/core/kqp/prepare/kqp_type_ann.cpp b/ydb/core/kqp/prepare/kqp_type_ann.cpp
index bfa6d038d4a..80afc03b442 100644
--- a/ydb/core/kqp/prepare/kqp_type_ann.cpp
+++ b/ydb/core/kqp/prepare/kqp_type_ann.cpp
@@ -4,6 +4,7 @@
#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>
#include <ydb/library/yql/core/type_ann/type_ann_core.h>
+#include "ydb/library/yql/core/type_ann/type_ann_impl.h"
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
#include <ydb/library/yql/utils/log/log.h>
@@ -920,6 +921,42 @@ TStatus AnnotateOlapAgg(const TExprNode::TPtr& node, TExprContext& ctx) {
return TStatus::Ok;
}
+
+TStatus AnnotateOlapExtractMembers(const TExprNode::TPtr& node, TExprContext& ctx) {
+ if (!EnsureArgsCount(*node, 2, ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const TTypeAnnotationNode* nodeItemType = nullptr;
+ if (!EnsureNewSeqType<true>(node->Head(), ctx, &nodeItemType)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureStructType(node->Head().Pos(), *nodeItemType, ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ const auto structType = nodeItemType->Cast<TStructExprType>();
+ TVector<const TItemExprType*> resItems;
+ for (auto& x : node->Tail().Children()) {
+ YQL_ENSURE(x->IsAtom());
+ auto pos = NYql::NTypeAnnImpl::FindOrReportMissingMember(x->Content(), node->Head().Pos(), *structType, ctx);
+ if (!pos) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ resItems.push_back(structType->GetItems()[*pos]);
+ }
+
+ const auto resItemType = ctx.MakeType<TStructExprType>(resItems);
+ if (!resItemType->Validate(node->Pos(), ctx)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ node->SetTypeAnn(MakeSequenceType(node->Head().GetTypeAnn()->GetKind(), *resItemType, ctx));
+ return IGraphTransformer::TStatus::Ok;
+}
+
TStatus AnnotateKqpTxInternalBinding(const TExprNode::TPtr& node, TExprContext& ctx) {
if (!EnsureArgsCount(*node, 2, ctx)) {
return TStatus::Error;
@@ -1237,6 +1274,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
return AnnotateOlapAgg(input, ctx);
}
+ if (TKqpOlapExtractMembers::Match(input.Get())) {
+ return AnnotateOlapExtractMembers(input, ctx);
+ }
+
if (TKqpCnMapShard::Match(input.Get()) || TKqpCnShuffleShard::Match(input.Get())) {
return AnnotateDqConnection(input, ctx);
}
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index 1c290d31b4a..79d26ffb590 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -1751,7 +1751,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST(Aggregation_ResultCountAll_FilterL) {
- //https://st.yandex-team.ru/KIKIMR-15900
+ //https://st.yandex-team.ru/KIKIMR-16073
return;
TAggregationTestCase testCase;
testCase.SetQuery(R"(
@@ -1834,8 +1834,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST(Aggregation_ResultT_FilterL_Limit2) {
- //https://st.yandex-team.ru/KIKIMR-15900
- return;
TAggregationTestCase testCase;
testCase.SetQuery(R"(
SELECT
@@ -1845,14 +1843,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
LIMIT 2
)")
.AddExpectedPlanOptions("KqpOlapFilter")
+ .AddExpectedPlanOptions("KqpOlapExtractMembers")
.MutableLimitChecker().SetExpectedLimit(2);
TestAggregations({ testCase });
}
Y_UNIT_TEST(Aggregation_ResultT_FilterL_OrderT_Limit2) {
- //https://st.yandex-team.ru/KIKIMR-15900
- return;
TAggregationTestCase testCase;
testCase.SetQuery(R"(
SELECT
@@ -1863,11 +1860,27 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
LIMIT 2
)")
.AddExpectedPlanOptions("KqpOlapFilter")
+ .AddExpectedPlanOptions("KqpOlapExtractMembers")
.MutableLimitChecker().SetExpectedLimit(2);
TestAggregations({ testCase });
}
+ Y_UNIT_TEST(Aggregation_ResultL_FilterL_OrderL_Limit2) {
+ TAggregationTestCase testCase;
+ testCase.SetQuery(R"(
+ SELECT
+ timestamp, level
+ FROM `/Root/olapStore/olapTable`
+ WHERE level > 1
+ ORDER BY level
+ LIMIT 2
+ )")
+ .AddExpectedPlanOptions("KqpOlapFilter");
+
+ TestAggregations({ testCase });
+ }
+
Y_UNIT_TEST(AggregationAndFilterPushdownOnDiffCols) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h
index cc7fba4b373..ca5354b8d1d 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h
@@ -53,13 +53,14 @@ public:
ApplyRangePredicates(batch);
+ // Leave only requested columns
+ auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema);
+
if (!ReadMetadata->Program.empty()) {
auto status = ApplyProgram(batch, ReadMetadata->Program, NArrow::GetCustomExecContext());
Y_VERIFY_S(status.ok(), status.message());
}
- // Leave only requested columns
- auto resultBatch = NArrow::ExtractColumns(batch, ResultSchema);
NOlap::TPartialReadResult out{std::move(resultBatch), std::move(lastKey)};
return out;
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 9ab90ef4368..67db76b1b22 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -869,11 +869,11 @@ namespace NTypeAnnImpl {
return result;
}
- TMaybe<ui32> FindOrReportMissingMember(TStringBuf memberName, TPositionHandle pos, const TStructExprType& structType, TContext& ctx) {
+ TMaybe<ui32> FindOrReportMissingMember(TStringBuf memberName, TPositionHandle pos, const TStructExprType& structType, TExprContext& ctx) {
TString errStr;
auto result = FindOrReportMissingMember(memberName, structType, errStr);
if (!result) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(pos), errStr));
+ ctx.AddError(TIssue(ctx.GetPosition(pos), errStr));
}
return result;
}
@@ -913,7 +913,7 @@ namespace NTypeAnnImpl {
}
auto memberName = input->Tail().Content();
- auto pos = FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx);
+ auto pos = FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx.Expr);
if (!pos) {
return IGraphTransformer::TStatus::Error;
}
@@ -1095,7 +1095,7 @@ namespace NTypeAnnImpl {
bool isOptional = input->Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional;
const TStructExprType& structType = *RemoveAllOptionals(input->Head().GetTypeAnn())->Cast<TStructExprType>();
- auto pos = FindOrReportMissingMember(columnNameNode->Content(), input->Pos(), structType, ctx);
+ auto pos = FindOrReportMissingMember(columnNameNode->Content(), input->Pos(), structType, ctx.Expr);
if (!pos) {
return IGraphTransformer::TStatus::Error;
}
@@ -1706,7 +1706,7 @@ namespace NTypeAnnImpl {
TVector<const TItemExprType*> newItems = structType->GetItems();
EraseIf(newItems, [&](const auto& item) { return item->GetName() == memberName; });
- if (!Forced && !FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx)) {
+ if (!Forced && !FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
@@ -1744,7 +1744,7 @@ namespace NTypeAnnImpl {
auto memberName = child->Content();
EraseIf(newItems, [&](const auto& item) { return item->GetName() == memberName; });
- if (!Forced && !FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx)) {
+ if (!Forced && !FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
}
@@ -1927,7 +1927,7 @@ namespace NTypeAnnImpl {
auto memberName = input->Child(1)->Content();
auto structType = input->Head().GetTypeAnn()->Cast<TStructExprType>();
- auto pos = FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx);
+ auto pos = FindOrReportMissingMember(memberName, input->Pos(), *structType, ctx.Expr);
if (!pos) {
return IGraphTransformer::TStatus::Error;
}
@@ -5032,7 +5032,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
const TTypeAnnotationNode* itemType;
if (varType->GetUnderlyingType()->GetKind() == ETypeAnnotationKind::Struct) {
auto structType = varType->GetUnderlyingType()->Cast<TStructExprType>();
- auto pos = FindOrReportMissingMember(input->Child(1)->Content(), input->Pos(), *structType, ctx);
+ auto pos = FindOrReportMissingMember(input->Child(1)->Content(), input->Pos(), *structType, ctx.Expr);
if (!pos) {
return IGraphTransformer::TStatus::Error;
}
@@ -7904,7 +7904,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
input->SetTypeAnn(tupleType->GetItems()[index]);
} else {
auto structType = variantType->GetUnderlyingType()->Cast<TStructExprType>();
- auto pos = FindOrReportMissingMember(input->Child(1)->Content(), input->Pos(), *structType, ctx);
+ auto pos = FindOrReportMissingMember(input->Child(1)->Content(), input->Pos(), *structType, ctx.Expr);
if (!pos) {
return IGraphTransformer::TStatus::Error;
}
@@ -8023,7 +8023,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
itemType = tupleType->GetItems()[index];
itemIndex = index;
} else {
- auto pos = FindOrReportMissingMember(child->Content(), child->Pos(), *structType, ctx);
+ auto pos = FindOrReportMissingMember(child->Content(), child->Pos(), *structType, ctx.Expr);
if (!pos) {
return IGraphTransformer::TStatus::Error;
}
@@ -11030,7 +11030,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
return IGraphTransformer::TStatus::Error;
}
- auto pos = FindOrReportMissingMember(key, keyNode->Pos(), structType, ctx);
+ auto pos = FindOrReportMissingMember(key, keyNode->Pos(), structType, ctx.Expr);
if (!pos) {
return IGraphTransformer::TStatus::Error;
}
diff --git a/ydb/library/yql/core/type_ann/type_ann_impl.h b/ydb/library/yql/core/type_ann/type_ann_impl.h
index 31677f60ade..48e8ffd60a0 100644
--- a/ydb/library/yql/core/type_ann/type_ann_impl.h
+++ b/ydb/library/yql/core/type_ann/type_ann_impl.h
@@ -34,7 +34,7 @@ namespace NTypeAnnImpl {
IGraphTransformer::TStatus CombineCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus GroupingCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
- TMaybe<ui32> FindOrReportMissingMember(TStringBuf memberName, TPositionHandle pos, const TStructExprType& structType, TContext& ctx);
+ TMaybe<ui32> FindOrReportMissingMember(TStringBuf memberName, TPositionHandle pos, const TStructExprType& structType, TExprContext& ctx);
TExprNode::TPtr MakeNothingData(TExprContext& ctx, TPositionHandle pos, TStringBuf data);
} // namespace NTypeAnnImpl
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp
index 1f8bf18c6d9..0c267c4f134 100644
--- a/ydb/library/yql/core/type_ann/type_ann_list.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp
@@ -4213,7 +4213,7 @@ namespace {
TVector<const TItemExprType*> resItems;
for (auto& x : input->Tail().Children()) {
YQL_ENSURE(x->IsAtom());
- auto pos = FindOrReportMissingMember(x->Content(), input->Head().Pos(), *structType, ctx);
+ auto pos = FindOrReportMissingMember(x->Content(), input->Head().Pos(), *structType, ctx.Expr);
if (!pos) {
return IGraphTransformer::TStatus::Error;
}
@@ -4252,7 +4252,7 @@ namespace {
const auto structType = inputItemType->Cast<TStructExprType>();
for (auto& x : input->Tail().Children()) {
YQL_ENSURE(x->IsAtom());
- auto pos = FindOrReportMissingMember(x->Content(), input->Head().Pos(), *structType, ctx);
+ auto pos = FindOrReportMissingMember(x->Content(), input->Head().Pos(), *structType, ctx.Expr);
if (!pos) {
return IGraphTransformer::TStatus::Error;
}
@@ -4306,7 +4306,7 @@ namespace {
for (auto& column : columnOrder) {
YQL_ENSURE(column->IsAtom());
- auto pos = FindOrReportMissingMember(column->Content(), input->Head().Pos(), *structType, ctx);
+ auto pos = FindOrReportMissingMember(column->Content(), input->Head().Pos(), *structType, ctx.Expr);
if (!pos) {
return IGraphTransformer::TStatus::Error;
}
@@ -4762,7 +4762,7 @@ namespace {
continue;
}
- auto item = FindOrReportMissingMember(child->Content(), child->Pos(), *inputStructType, ctx);
+ auto item = FindOrReportMissingMember(child->Content(), child->Pos(), *inputStructType, ctx.Expr);
if (!item) {
return IGraphTransformer::TStatus::Error;
}