diff options
author | aidarsamer <[email protected]> | 2023-02-16 13:08:29 +0300 |
---|---|---|
committer | aidarsamer <[email protected]> | 2023-02-16 13:08:29 +0300 |
commit | 1147111f58eb766b5c013fa5f4aecf55427a86a9 (patch) | |
tree | 35bbfaf7e8fa692d8b0a6f4161ed4009610c1321 | |
parent | f1eff52148cd5cf1c8b895eed1dc85c22d438eb9 (diff) |
Add support for flow and stream in expanding AggregateCombine callable to blocks
Add support for flow and stream in expanding AggregateCombine callable to blocks
-rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 13 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_aggregate_expander.cpp | 34 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_aggregate_expander.h | 6 |
3 files changed, 38 insertions, 15 deletions
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 21cf87f61da..11011b6e07d 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -3848,6 +3848,19 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TestTableWithNulls({ testCase }); } + + Y_UNIT_TEST(Blocks_NoAggPushdown) { + TAggregationTestCase testCase; + testCase.SetQuery(R"( + PRAGMA UseBlocks; + SELECT + COUNT(DISTINCT id) + FROM `/Root/tableWithNulls`; + )") + .SetExpectedReply("[[10u]]"); + + TestTableWithNulls({ testCase }); + } } } // namespace NKqp diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp index 95e331f0943..1013b75d997 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.cpp +++ b/ydb/library/yql/core/yql_aggregate_expander.cpp @@ -499,10 +499,10 @@ TExprNode::TPtr TAggregateExpander::GetFinalAggStateExtractor(ui32 i) { .Build(); } -TExprNode::TPtr TAggregateExpander::MakeInputBlocks(const TExprNode::TPtr& streamArg, TExprNode::TListType& keyIdxs, +TExprNode::TPtr TAggregateExpander::MakeInputBlocks(const TExprNode::TPtr& stream, TExprNode::TListType& keyIdxs, TVector<TString>& outputColumns, TExprNode::TListType& aggs, bool overState, bool many, ui32* streamIdxColumn) { - auto flow = Ctx.NewCallable(Node->Pos(), "ToFlow", { streamArg }); TVector<TString> inputColumns; + auto flow = Ctx.NewCallable(Node->Pos(), "ToFlow", { stream }); for (ui32 i = 0; i < RowType->GetSize(); ++i) { inputColumns.push_back(TString(RowType->GetItems()[i]->GetName())); } @@ -623,12 +623,18 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() { } const bool hashed = (KeyColumns->ChildrenSize() > 0); + const bool isInputList = (AggList->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List); - auto streamArg = Ctx.NewArgument(Node->Pos(), "stream"); TExprNode::TListType keyIdxs; TVector<TString> outputColumns; TExprNode::TListType aggs; - auto blocks = MakeInputBlocks(streamArg, keyIdxs, outputColumns, aggs, false, false); + TExprNode::TPtr stream = nullptr; + if (isInputList) { + stream = Ctx.NewArgument(Node->Pos(), "stream"); + } else { + stream = AggList; + } + auto blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false); if (!blocks) { return nullptr; } @@ -658,15 +664,19 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() { } auto finalFlow = MakeNarrowMap(Node->Pos(), outputColumns, aggWideFlow, Ctx); - auto root = Ctx.NewCallable(Node->Pos(), "FromFlow", { finalFlow }); - auto lambdaStream = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), { streamArg }), std::move(root)); + if (isInputList) { + auto root = Ctx.NewCallable(Node->Pos(), "FromFlow", { finalFlow }); + auto lambdaStream = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), { stream }), std::move(root)); - return Ctx.Builder(Node->Pos()) - .Callable("LMap") - .Add(0, AggList) - .Add(1, lambdaStream) - .Seal() - .Build(); + return Ctx.Builder(Node->Pos()) + .Callable("LMap") + .Add(0, AggList) + .Add(1, lambdaStream) + .Seal() + .Build(); + } else { + return finalFlow; + } } TExprNode::TPtr TAggregateExpander::GeneratePartialAggregateForNonDistinct(const TExprNode::TPtr& keyExtractor, const TExprNode::TPtr& pickleTypeNode) diff --git a/ydb/library/yql/core/yql_aggregate_expander.h b/ydb/library/yql/core/yql_aggregate_expander.h index 0f983982e05..aa5adeeccf5 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.h +++ b/ydb/library/yql/core/yql_aggregate_expander.h @@ -37,7 +37,7 @@ public: .Order = VoidNode }; } - + TExprNode::TPtr ExpandAggregate(); static TExprNode::TPtr CountAggregateRewrite(const NNodes::TCoAggregate& node, TExprContext& ctx, bool useBlocks); @@ -58,7 +58,7 @@ private: // Partial aggregate generation TExprNode::TPtr GeneratePartialAggregate(const TExprNode::TPtr keyExtractor, const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needPickle); TExprNode::TPtr GeneratePartialAggregateForNonDistinct(const TExprNode::TPtr& keyExtractor, const TExprNode::TPtr& pickleTypeNode); - + TExprNode::TPtr GenerateDistinctGrouper(const TExprNode::TPtr distinctField, const TVector<const TTypeAnnotationNode*>& keyItemTypes, bool needDistinctPickle); @@ -84,7 +84,7 @@ private: TExprNode::TPtr TryGenerateBlockMergeFinalizeHashed(); TExprNode::TPtr TryGenerateBlockCombine(); TExprNode::TPtr TryGenerateBlockMergeFinalize(); - TExprNode::TPtr MakeInputBlocks(const TExprNode::TPtr& streamArg, TExprNode::TListType& keyIdxs, + TExprNode::TPtr MakeInputBlocks(const TExprNode::TPtr& stream, TExprNode::TListType& keyIdxs, TVector<TString>& outputColumns, TExprNode::TListType& aggs, bool overState, bool many, ui32* streamIdxColumn = nullptr); private: |