diff options
author | vvvv <vvvv@ydb.tech> | 2022-11-22 22:40:45 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-11-22 22:40:45 +0300 |
commit | 56595c60783bd80390b4d410a80f60a7969e2a15 (patch) | |
tree | d7a0a05a3874eb73ef73a25cd99a39c3847c9e3c | |
parent | 8ce89cc8436ea7052bbed327149cd6be12987f53 (diff) | |
download | ydb-56595c60783bd80390b4d410a80f60a7969e2a15.tar.gz |
pushdown of filter into BlockCombineAll, supported for count(*)
6 files changed, 75 insertions, 32 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index 15ec02217e6..5d419bf8f7a 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -4859,6 +4859,34 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte .Build(); } +TExprNode::TPtr UpdateBlockCombineAllColumns(const TExprNode::TPtr& node, std::optional<ui32> filterColumn, const TVector<ui32>& argIndices, TExprContext& ctx) { + auto combineChildren = node->ChildrenList(); + combineChildren[0] = node->Head().HeadPtr(); + combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[1]->Content())])); + if (filterColumn) { + YQL_ENSURE(combineChildren[2]->IsCallable("Void"), "Filter column is already used"); + combineChildren[2] = ctx.NewAtom(node->Pos(), ToString(*filterColumn)); + } else { + if (!combineChildren[2]->IsCallable("Void")) { + combineChildren[2] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[2]->Content())])); + } + } + + auto payloadNodes = combineChildren[3]->ChildrenList(); + for (auto& p : payloadNodes) { + YQL_ENSURE(p->IsList() && p->ChildrenSize() >= 1 && p->Head().IsCallable("AggBlockApply"), "Expected AggBlockApply"); + auto payloadArgs = p->ChildrenList(); + for (ui32 i = 1; i < payloadArgs.size(); ++i) { + payloadArgs[i] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(payloadArgs[i]->Content())])); + } + + p = ctx.ChangeChildren(*p, std::move(payloadArgs)); + } + + combineChildren[3] = ctx.ChangeChildren(*combineChildren[3], std::move(payloadNodes)); + return ctx.ChangeChildren(*node, std::move(combineChildren)); +} + TExprNode::TPtr OptimizeBlockCombineAll(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); if (node->Head().IsCallable("WideMap")) { @@ -4867,28 +4895,20 @@ TExprNode::TPtr OptimizeBlockCombineAll(const TExprNode::TPtr& node, TExprContex bool onlyArguments = IsArgumentsOnlyLambda(lambda, argIndices); if (onlyArguments) { YQL_CLOG(DEBUG, CorePeepHole) << "Drop renaming WideMap under " << node->Content(); - // change # for all columns, including count/filter - auto combineChildren = node->ChildrenList(); - combineChildren[0] = node->Head().HeadPtr(); - combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[1]->Content())])); - if (!combineChildren[2]->IsCallable("Void")) { - combineChildren[2] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[2]->Content())])); - } - - auto payloadNodes = combineChildren[3]->ChildrenList(); - for (auto& p : payloadNodes) { - YQL_ENSURE(p->IsList() && p->ChildrenSize() >= 1 && p->Head().IsCallable("AggBlockApply"), "Expected AggBlockApply"); - auto payloadArgs = p->ChildrenList(); - for (ui32 i = 1; i < payloadArgs.size(); ++i) { - payloadArgs[i] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(payloadArgs[i]->Content())])); - } - - p = ctx.ChangeChildren(*p, std::move(payloadArgs)); - } + return UpdateBlockCombineAllColumns(node, {}, argIndices, ctx); + } + } - combineChildren[3] = ctx.ChangeChildren(*combineChildren[3], std::move(payloadNodes)); - return ctx.ChangeChildren(*node, std::move(combineChildren)); + if (node->Head().IsCallable("BlockCompress") && node->Child(2)->IsCallable("Void")) { + auto filterIndex = FromString<ui32>(node->Head().Child(1)->Content()); + TVector<ui32> argIndices; + argIndices.resize(node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetSize()); + for (ui32 i = 0; i < argIndices.size(); ++i) { + argIndices[i] = (i < filterIndex) ? i : i + 1; } + + YQL_CLOG(DEBUG, CorePeepHole) << "Fuse " << node->Content() << " with " << node->Head().Content(); + return UpdateBlockCombineAllColumns(node, filterIndex, argIndices, ctx); } return node; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp index 421b80dbcb1..f6acca37d35 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp @@ -6,6 +6,8 @@ #include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <arrow/array/array_primitive.h> + namespace NKikimr { namespace NMiniKQL { @@ -53,10 +55,21 @@ public: continue; } + std::optional<ui64> filtered; + if (FilterColumn_) { + arrow::BooleanArray arr(TArrowBlock::From(s.Values_[*FilterColumn_]).GetDatum().array());
+ ui64 popCount = (ui64)arr.true_count(); + if (popCount == 0) { + continue; + } + + filtered = popCount; + } + s.HasValues_ = true; for (size_t i = 0; i < s.Aggs_.size(); ++i) { if (output[i]) { - s.Aggs_[i]->AddMany(s.Values_.data(), batchLength); + s.Aggs_[i]->AddMany(s.Values_.data(), batchLength, filtered); } } } else { @@ -142,7 +155,6 @@ IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNod filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>(); } - MKQL_ENSURE(!filterColumn, "Filter column is not supported yet"); auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(3)); TVector<TAggParams> aggsParams; for (ui32 i = 0; i < aggsVal->GetValuesCount(); ++i) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp index 169e12a6556..c985b182206 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp @@ -10,9 +10,13 @@ public: { } - void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final { + void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { Y_UNUSED(columns); - State_ += batchLength; + if (filtered) { + State_ += *filtered; + } else { + State_ += batchLength; + } } NUdf::TUnboxedValue Finish() final { @@ -31,7 +35,8 @@ public: { } - void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final { + void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { + Y_ENSURE(!filtered); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { if (datum.scalar()->is_valid) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h index a60b243e303..fc3a44270f4 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h @@ -10,7 +10,7 @@ class IBlockAggregator { public: virtual ~IBlockAggregator() = default; - virtual void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) = 0; + virtual void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) = 0; virtual NUdf::TUnboxedValue Finish() = 0; }; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp index a0fde1f70d7..9cea6d1fbc7 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp @@ -33,7 +33,8 @@ public: } } - void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final { + void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { + Y_ENSURE(!filtered); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { if (datum.scalar()->is_valid) { @@ -97,7 +98,8 @@ public: } } - void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final { + void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { + Y_ENSURE(!filtered); Y_UNUSED(batchLength); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); MKQL_ENSURE(datum.is_array(), "Expected array"); @@ -137,7 +139,8 @@ public: } } - void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final { + void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { + Y_ENSURE(!filtered); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { if (datum.scalar()->is_valid) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp index 2fe4f0d0d18..dbf0d5a5edb 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp @@ -19,7 +19,8 @@ public: { } - void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final { + void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { + Y_ENSURE(!filtered); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { if (datum.scalar()->is_valid) { @@ -78,7 +79,8 @@ public: { } - void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final { + void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { + Y_ENSURE(!filtered); Y_UNUSED(batchLength); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); MKQL_ENSURE(datum.is_array(), "Expected array"); @@ -115,7 +117,8 @@ public: { } - void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final { + void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final { + Y_ENSURE(!filtered); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { if (datum.scalar()->is_valid) { |