diff options
author | vvvv <vvvv@ydb.tech> | 2022-12-13 12:21:02 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-12-13 12:21:02 +0300 |
commit | 1cb79b33514086fa93c2d365e17f961e3ae6ab84 (patch) | |
tree | fc15bad1004fcf6efdaeb3a70ef57707e76f891f | |
parent | d03c8226fc53c73dc64292d05366de418d8fb52a (diff) | |
download | ydb-1cb79b33514086fa93c2d365e17f961e3ae6ab84.tar.gz |
pushdown of filter into hashed combiner
-rw-r--r-- | ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp | 28 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp | 56 |
2 files changed, 66 insertions, 18 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index 7e879b086ea..20d6fae8d5c 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -4881,7 +4881,7 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte .Build(); } -TExprNode::TPtr UpdateBlockCombineAllColumns(const TExprNode::TPtr& node, std::optional<ui32> filterColumn, const TVector<ui32>& argIndices, TExprContext& ctx) { +TExprNode::TPtr UpdateBlockCombineColumns(const TExprNode::TPtr& node, std::optional<ui32> filterColumn, const TVector<ui32>& argIndices, TExprContext& ctx) { auto combineChildren = node->ChildrenList(); combineChildren[0] = node->Head().HeadPtr(); if (filterColumn) { @@ -4889,11 +4889,22 @@ TExprNode::TPtr UpdateBlockCombineAllColumns(const TExprNode::TPtr& node, std::o combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(*filterColumn)); } else { if (!combineChildren[1]->IsCallable("Void")) { - combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[2]->Content())])); + combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[1]->Content())])); } } - auto payloadNodes = combineChildren[2]->ChildrenList(); + const bool hashed = node->Content().EndsWith("Hashed"); + if (hashed) { + auto keyNodes = combineChildren[2]->ChildrenList(); + for (auto& p : keyNodes) { + p = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(p->Content())])); + } + + combineChildren[2] = ctx.ChangeChildren(*combineChildren[2], std::move(keyNodes)); + } + + auto payloadIndex = hashed ? 3 : 2; + auto payloadNodes = combineChildren[payloadIndex]->ChildrenList(); for (auto& p : payloadNodes) { YQL_ENSURE(p->IsList() && p->ChildrenSize() >= 1 && p->Head().IsCallable("AggBlockApply"), "Expected AggBlockApply"); auto payloadArgs = p->ChildrenList(); @@ -4904,11 +4915,11 @@ TExprNode::TPtr UpdateBlockCombineAllColumns(const TExprNode::TPtr& node, std::o p = ctx.ChangeChildren(*p, std::move(payloadArgs)); } - combineChildren[2] = ctx.ChangeChildren(*combineChildren[2], std::move(payloadNodes)); + combineChildren[payloadIndex] = ctx.ChangeChildren(*combineChildren[payloadIndex], std::move(payloadNodes)); return ctx.ChangeChildren(*node, std::move(combineChildren)); } -TExprNode::TPtr OptimizeBlockCombineAll(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { +TExprNode::TPtr OptimizeBlockCombine(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) { Y_UNUSED(types); if (node->Head().IsCallable("WideMap")) { const auto& lambda = node->Head().Tail(); @@ -4916,7 +4927,7 @@ TExprNode::TPtr OptimizeBlockCombineAll(const TExprNode::TPtr& node, TExprContex bool onlyArguments = IsArgumentsOnlyLambda(lambda, argIndices); if (onlyArguments) { YQL_CLOG(DEBUG, CorePeepHole) << "Drop renaming WideMap under " << node->Content(); - return UpdateBlockCombineAllColumns(node, {}, argIndices, ctx); + return UpdateBlockCombineColumns(node, {}, argIndices, ctx); } } @@ -4929,7 +4940,7 @@ TExprNode::TPtr OptimizeBlockCombineAll(const TExprNode::TPtr& node, TExprContex } YQL_CLOG(DEBUG, CorePeepHole) << "Fuse " << node->Content() << " with " << node->Head().Content(); - return UpdateBlockCombineAllColumns(node, filterIndex, argIndices, ctx); + return UpdateBlockCombineColumns(node, filterIndex, argIndices, ctx); } return node; @@ -6713,7 +6724,8 @@ struct TPeepHoleRules { {"WideToBlocks", &OptimizeWideToBlocks}, {"Skip", &OptimizeSkipTakeToBlocks}, {"Take", &OptimizeSkipTakeToBlocks}, - {"BlockCombineAll", &OptimizeBlockCombineAll}, + {"BlockCombineAll", &OptimizeBlockCombine}, + {"BlockCombineHashed", &OptimizeBlockCombine}, }; TPeepHoleRules() diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp index 729d014c765..5ffdb8ad4a9 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp @@ -485,10 +485,10 @@ TStringBuf GetKeyView(const TSSOKey& key) { return key.AsView(); } -template <typename TKey, bool UseSet> -class TBlockCombineHashedWrapper : public TStatefulWideFlowComputationNode<TBlockCombineHashedWrapper<TKey, UseSet>> { +template <typename TKey, bool UseSet, bool UseFilter> +class TBlockCombineHashedWrapper : public TStatefulWideFlowComputationNode<TBlockCombineHashedWrapper<TKey, UseSet, UseFilter>> { public: - using TSelf = TBlockCombineHashedWrapper<TKey, UseSet>; + using TSelf = TBlockCombineHashedWrapper<TKey, UseSet, UseFilter>; using TBase = TStatefulWideFlowComputationNode<TSelf>; TBlockCombineHashedWrapper(TComputationMutables& mutables, @@ -508,6 +508,11 @@ public: , AggsParams_(std::move(aggsParams)) { MKQL_ENSURE(Width_ > 0, "Missing block length column"); + if constexpr (UseFilter) { + MKQL_ENSURE(filterColumn, "Missing filter column"); + } else { + MKQL_ENSURE(!filterColumn, "Unexpected filter column"); + } } EFetchResult DoCalculate(NUdf::TUnboxedValue& state, @@ -529,6 +534,23 @@ public: continue; } + const ui8* filterBitmap = nullptr; + if constexpr (UseFilter) { + auto filterDatum = TArrowBlock::From(s.Values_[*FilterColumn_]).GetDatum(); + if (filterDatum.is_scalar()) { + if (!filterDatum.template scalar_as<arrow::UInt8Scalar>().value) { + continue; + } + } else { + const auto& arr = filterDatum.array(); + filterBitmap = arr->template GetValues<ui8>(1); + ui64 popCount = GetBitmapPopCount(arr); + if (popCount == 0) { + continue; + } + } + } + s.HasValues_ = true; TVector<arrow::Datum> keysDatum; keysDatum.reserve(Keys_.size()); @@ -539,6 +561,12 @@ public: TOutputBuffer out; out.Resize(sizeof(TKey)); for (ui64 row = 0; row < batchLength; ++row) { + if constexpr (UseFilter) { + if (filterBitmap && !filterBitmap[row]) { + continue; + } + } + out.Rewind(); // encode key for (ui32 i = 0; i < keysDatum.size(); ++i) { @@ -742,7 +770,7 @@ void FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, TVector<TAggPa } } -template <bool UseSet> +template <bool UseSet, bool UseFilter> IComputationNode* MakeBlockCombineHashedWrapper( ui32 totalKeysSize, TComputationMutables& mutables, @@ -753,14 +781,14 @@ IComputationNode* MakeBlockCombineHashedWrapper( std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers, TVector<TAggParams>&& aggsParams) { if (totalKeysSize <= sizeof(ui32)) { - return new TBlockCombineHashedWrapper<ui32, UseSet>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams)); + return new TBlockCombineHashedWrapper<ui32, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams)); } if (totalKeysSize <= sizeof(ui64)) { - return new TBlockCombineHashedWrapper<ui64, UseSet>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams)); + return new TBlockCombineHashedWrapper<ui64, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams)); } - return new TBlockCombineHashedWrapper<TSSOKey, UseSet>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams)); + return new TBlockCombineHashedWrapper<TSSOKey, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams)); } } @@ -903,10 +931,18 @@ IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputation } } - if (aggsParams.size() == 0) { - return MakeBlockCombineHashedWrapper<true>(totalKeysSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams)); + if (filterColumn) { + if (aggsParams.size() == 0) { + return MakeBlockCombineHashedWrapper<true, true>(totalKeysSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams)); + } else { + return MakeBlockCombineHashedWrapper<false, true>(totalKeysSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams)); + } } else { - return MakeBlockCombineHashedWrapper<false>(totalKeysSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams)); + if (aggsParams.size() == 0) { + return MakeBlockCombineHashedWrapper<true, false>(totalKeysSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams)); + } else { + return MakeBlockCombineHashedWrapper<false, false>(totalKeysSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams)); + } } } |