summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <[email protected]>2023-02-16 13:08:29 +0300
committeraidarsamer <[email protected]>2023-02-16 13:08:29 +0300
commit1147111f58eb766b5c013fa5f4aecf55427a86a9 (patch)
tree35bbfaf7e8fa692d8b0a6f4161ed4009610c1321
parentf1eff52148cd5cf1c8b895eed1dc85c22d438eb9 (diff)
Add support for flow and stream in expanding AggregateCombine callable to blocks
Add support for flow and stream in expanding AggregateCombine callable to blocks
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp13
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.cpp34
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.h6
3 files changed, 38 insertions, 15 deletions
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 21cf87f61da..11011b6e07d 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -3848,6 +3848,19 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TestTableWithNulls({ testCase });
}
+
+ Y_UNIT_TEST(Blocks_NoAggPushdown) {
+ TAggregationTestCase testCase;
+ testCase.SetQuery(R"(
+ PRAGMA UseBlocks;
+ SELECT
+ COUNT(DISTINCT id)
+ FROM `/Root/tableWithNulls`;
+ )")
+ .SetExpectedReply("[[10u]]");
+
+ TestTableWithNulls({ testCase });
+ }
}
} // namespace NKqp
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp
index 95e331f0943..1013b75d997 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.cpp
+++ b/ydb/library/yql/core/yql_aggregate_expander.cpp
@@ -499,10 +499,10 @@ TExprNode::TPtr TAggregateExpander::GetFinalAggStateExtractor(ui32 i) {
.Build();
}
-TExprNode::TPtr TAggregateExpander::MakeInputBlocks(const TExprNode::TPtr& streamArg, TExprNode::TListType& keyIdxs,
+TExprNode::TPtr TAggregateExpander::MakeInputBlocks(const TExprNode::TPtr& stream, TExprNode::TListType& keyIdxs,
TVector<TString>& outputColumns, TExprNode::TListType& aggs, bool overState, bool many, ui32* streamIdxColumn) {
- auto flow = Ctx.NewCallable(Node->Pos(), "ToFlow", { streamArg });
TVector<TString> inputColumns;
+ auto flow = Ctx.NewCallable(Node->Pos(), "ToFlow", { stream });
for (ui32 i = 0; i < RowType->GetSize(); ++i) {
inputColumns.push_back(TString(RowType->GetItems()[i]->GetName()));
}
@@ -623,12 +623,18 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
}
const bool hashed = (KeyColumns->ChildrenSize() > 0);
+ const bool isInputList = (AggList->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List);
- auto streamArg = Ctx.NewArgument(Node->Pos(), "stream");
TExprNode::TListType keyIdxs;
TVector<TString> outputColumns;
TExprNode::TListType aggs;
- auto blocks = MakeInputBlocks(streamArg, keyIdxs, outputColumns, aggs, false, false);
+ TExprNode::TPtr stream = nullptr;
+ if (isInputList) {
+ stream = Ctx.NewArgument(Node->Pos(), "stream");
+ } else {
+ stream = AggList;
+ }
+ auto blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false);
if (!blocks) {
return nullptr;
}
@@ -658,15 +664,19 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
}
auto finalFlow = MakeNarrowMap(Node->Pos(), outputColumns, aggWideFlow, Ctx);
- auto root = Ctx.NewCallable(Node->Pos(), "FromFlow", { finalFlow });
- auto lambdaStream = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), { streamArg }), std::move(root));
+ if (isInputList) {
+ auto root = Ctx.NewCallable(Node->Pos(), "FromFlow", { finalFlow });
+ auto lambdaStream = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), { stream }), std::move(root));
- return Ctx.Builder(Node->Pos())
- .Callable("LMap")
- .Add(0, AggList)
- .Add(1, lambdaStream)
- .Seal()
- .Build();
+ return Ctx.Builder(Node->Pos())
+ .Callable("LMap")
+ .Add(0, AggList)
+ .Add(1, lambdaStream)
+ .Seal()
+ .Build();
+ } else {
+ return finalFlow;
+ }
}
TExprNode::TPtr TAggregateExpander::GeneratePartialAggregateForNonDistinct(const TExprNode::TPtr& keyExtractor, const TExprNode::TPtr& pickleTypeNode)
diff --git a/ydb/library/yql/core/yql_aggregate_expander.h b/ydb/library/yql/core/yql_aggregate_expander.h
index 0f983982e05..aa5adeeccf5 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.h
+++ b/ydb/library/yql/core/yql_aggregate_expander.h
@@ -37,7 +37,7 @@ public:
.Order = VoidNode
};
}
-
+
TExprNode::TPtr ExpandAggregate();
static TExprNode::TPtr CountAggregateRewrite(const NNodes::TCoAggregate& node, TExprContext& ctx, bool useBlocks);
@@ -58,7 +58,7 @@ private:
// Partial aggregate generation
TExprNode::TPtr GeneratePartialAggregate(const TExprNode::TPtr keyExtractor, const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needPickle);
TExprNode::TPtr GeneratePartialAggregateForNonDistinct(const TExprNode::TPtr& keyExtractor, const TExprNode::TPtr& pickleTypeNode);
-
+
TExprNode::TPtr GenerateDistinctGrouper(const TExprNode::TPtr distinctField,
const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needDistinctPickle);
@@ -84,7 +84,7 @@ private:
TExprNode::TPtr TryGenerateBlockMergeFinalizeHashed();
TExprNode::TPtr TryGenerateBlockCombine();
TExprNode::TPtr TryGenerateBlockMergeFinalize();
- TExprNode::TPtr MakeInputBlocks(const TExprNode::TPtr& streamArg, TExprNode::TListType& keyIdxs,
+ TExprNode::TPtr MakeInputBlocks(const TExprNode::TPtr& stream, TExprNode::TListType& keyIdxs,
TVector<TString>& outputColumns, TExprNode::TListType& aggs, bool overState, bool many, ui32* streamIdxColumn = nullptr);
private: