aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2022-12-13 12:21:02 +0300
committervvvv <vvvv@ydb.tech>2022-12-13 12:21:02 +0300
commit1cb79b33514086fa93c2d365e17f961e3ae6ab84 (patch)
treefc15bad1004fcf6efdaeb3a70ef57707e76f891f
parentd03c8226fc53c73dc64292d05366de418d8fb52a (diff)
downloadydb-1cb79b33514086fa93c2d365e17f961e3ae6ab84.tar.gz
pushdown of filter into hashed combiner
-rw-r--r--ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp28
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp56
2 files changed, 66 insertions, 18 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
index 7e879b086ea..20d6fae8d5c 100644
--- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -4881,7 +4881,7 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte
.Build();
}
-TExprNode::TPtr UpdateBlockCombineAllColumns(const TExprNode::TPtr& node, std::optional<ui32> filterColumn, const TVector<ui32>& argIndices, TExprContext& ctx) {
+TExprNode::TPtr UpdateBlockCombineColumns(const TExprNode::TPtr& node, std::optional<ui32> filterColumn, const TVector<ui32>& argIndices, TExprContext& ctx) {
auto combineChildren = node->ChildrenList();
combineChildren[0] = node->Head().HeadPtr();
if (filterColumn) {
@@ -4889,11 +4889,22 @@ TExprNode::TPtr UpdateBlockCombineAllColumns(const TExprNode::TPtr& node, std::o
combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(*filterColumn));
} else {
if (!combineChildren[1]->IsCallable("Void")) {
- combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[2]->Content())]));
+ combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[1]->Content())]));
}
}
- auto payloadNodes = combineChildren[2]->ChildrenList();
+ const bool hashed = node->Content().EndsWith("Hashed");
+ if (hashed) {
+ auto keyNodes = combineChildren[2]->ChildrenList();
+ for (auto& p : keyNodes) {
+ p = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(p->Content())]));
+ }
+
+ combineChildren[2] = ctx.ChangeChildren(*combineChildren[2], std::move(keyNodes));
+ }
+
+ auto payloadIndex = hashed ? 3 : 2;
+ auto payloadNodes = combineChildren[payloadIndex]->ChildrenList();
for (auto& p : payloadNodes) {
YQL_ENSURE(p->IsList() && p->ChildrenSize() >= 1 && p->Head().IsCallable("AggBlockApply"), "Expected AggBlockApply");
auto payloadArgs = p->ChildrenList();
@@ -4904,11 +4915,11 @@ TExprNode::TPtr UpdateBlockCombineAllColumns(const TExprNode::TPtr& node, std::o
p = ctx.ChangeChildren(*p, std::move(payloadArgs));
}
- combineChildren[2] = ctx.ChangeChildren(*combineChildren[2], std::move(payloadNodes));
+ combineChildren[payloadIndex] = ctx.ChangeChildren(*combineChildren[payloadIndex], std::move(payloadNodes));
return ctx.ChangeChildren(*node, std::move(combineChildren));
}
-TExprNode::TPtr OptimizeBlockCombineAll(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
+TExprNode::TPtr OptimizeBlockCombine(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
if (node->Head().IsCallable("WideMap")) {
const auto& lambda = node->Head().Tail();
@@ -4916,7 +4927,7 @@ TExprNode::TPtr OptimizeBlockCombineAll(const TExprNode::TPtr& node, TExprContex
bool onlyArguments = IsArgumentsOnlyLambda(lambda, argIndices);
if (onlyArguments) {
YQL_CLOG(DEBUG, CorePeepHole) << "Drop renaming WideMap under " << node->Content();
- return UpdateBlockCombineAllColumns(node, {}, argIndices, ctx);
+ return UpdateBlockCombineColumns(node, {}, argIndices, ctx);
}
}
@@ -4929,7 +4940,7 @@ TExprNode::TPtr OptimizeBlockCombineAll(const TExprNode::TPtr& node, TExprContex
}
YQL_CLOG(DEBUG, CorePeepHole) << "Fuse " << node->Content() << " with " << node->Head().Content();
- return UpdateBlockCombineAllColumns(node, filterIndex, argIndices, ctx);
+ return UpdateBlockCombineColumns(node, filterIndex, argIndices, ctx);
}
return node;
@@ -6713,7 +6724,8 @@ struct TPeepHoleRules {
{"WideToBlocks", &OptimizeWideToBlocks},
{"Skip", &OptimizeSkipTakeToBlocks},
{"Take", &OptimizeSkipTakeToBlocks},
- {"BlockCombineAll", &OptimizeBlockCombineAll},
+ {"BlockCombineAll", &OptimizeBlockCombine},
+ {"BlockCombineHashed", &OptimizeBlockCombine},
};
TPeepHoleRules()
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
index 729d014c765..5ffdb8ad4a9 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
@@ -485,10 +485,10 @@ TStringBuf GetKeyView(const TSSOKey& key) {
return key.AsView();
}
-template <typename TKey, bool UseSet>
-class TBlockCombineHashedWrapper : public TStatefulWideFlowComputationNode<TBlockCombineHashedWrapper<TKey, UseSet>> {
+template <typename TKey, bool UseSet, bool UseFilter>
+class TBlockCombineHashedWrapper : public TStatefulWideFlowComputationNode<TBlockCombineHashedWrapper<TKey, UseSet, UseFilter>> {
public:
- using TSelf = TBlockCombineHashedWrapper<TKey, UseSet>;
+ using TSelf = TBlockCombineHashedWrapper<TKey, UseSet, UseFilter>;
using TBase = TStatefulWideFlowComputationNode<TSelf>;
TBlockCombineHashedWrapper(TComputationMutables& mutables,
@@ -508,6 +508,11 @@ public:
, AggsParams_(std::move(aggsParams))
{
MKQL_ENSURE(Width_ > 0, "Missing block length column");
+ if constexpr (UseFilter) {
+ MKQL_ENSURE(filterColumn, "Missing filter column");
+ } else {
+ MKQL_ENSURE(!filterColumn, "Unexpected filter column");
+ }
}
EFetchResult DoCalculate(NUdf::TUnboxedValue& state,
@@ -529,6 +534,23 @@ public:
continue;
}
+ const ui8* filterBitmap = nullptr;
+ if constexpr (UseFilter) {
+ auto filterDatum = TArrowBlock::From(s.Values_[*FilterColumn_]).GetDatum();
+ if (filterDatum.is_scalar()) {
+ if (!filterDatum.template scalar_as<arrow::UInt8Scalar>().value) {
+ continue;
+ }
+ } else {
+ const auto& arr = filterDatum.array();
+ filterBitmap = arr->template GetValues<ui8>(1);
+ ui64 popCount = GetBitmapPopCount(arr);
+ if (popCount == 0) {
+ continue;
+ }
+ }
+ }
+
s.HasValues_ = true;
TVector<arrow::Datum> keysDatum;
keysDatum.reserve(Keys_.size());
@@ -539,6 +561,12 @@ public:
TOutputBuffer out;
out.Resize(sizeof(TKey));
for (ui64 row = 0; row < batchLength; ++row) {
+ if constexpr (UseFilter) {
+ if (filterBitmap && !filterBitmap[row]) {
+ continue;
+ }
+ }
+
out.Rewind();
// encode key
for (ui32 i = 0; i < keysDatum.size(); ++i) {
@@ -742,7 +770,7 @@ void FillAggParams(TTupleLiteral* aggsVal, TTupleType* tupleType, TVector<TAggPa
}
}
-template <bool UseSet>
+template <bool UseSet, bool UseFilter>
IComputationNode* MakeBlockCombineHashedWrapper(
ui32 totalKeysSize,
TComputationMutables& mutables,
@@ -753,14 +781,14 @@ IComputationNode* MakeBlockCombineHashedWrapper(
std::vector<std::unique_ptr<IKeySerializer>>&& keySerializers,
TVector<TAggParams>&& aggsParams) {
if (totalKeysSize <= sizeof(ui32)) {
- return new TBlockCombineHashedWrapper<ui32, UseSet>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams));
+ return new TBlockCombineHashedWrapper<ui32, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams));
}
if (totalKeysSize <= sizeof(ui64)) {
- return new TBlockCombineHashedWrapper<ui64, UseSet>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams));
+ return new TBlockCombineHashedWrapper<ui64, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams));
}
- return new TBlockCombineHashedWrapper<TSSOKey, UseSet>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams));
+ return new TBlockCombineHashedWrapper<TSSOKey, UseSet, UseFilter>(mutables, flow, filterColumn, width, keys, std::move(keySerializers), std::move(aggsParams));
}
}
@@ -903,10 +931,18 @@ IComputationNode* WrapBlockCombineHashed(TCallable& callable, const TComputation
}
}
- if (aggsParams.size() == 0) {
- return MakeBlockCombineHashedWrapper<true>(totalKeysSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams));
+ if (filterColumn) {
+ if (aggsParams.size() == 0) {
+ return MakeBlockCombineHashedWrapper<true, true>(totalKeysSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams));
+ } else {
+ return MakeBlockCombineHashedWrapper<false, true>(totalKeysSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams));
+ }
} else {
- return MakeBlockCombineHashedWrapper<false>(totalKeysSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams));
+ if (aggsParams.size() == 0) {
+ return MakeBlockCombineHashedWrapper<true, false>(totalKeysSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams));
+ } else {
+ return MakeBlockCombineHashedWrapper<false, false>(totalKeysSize, ctx.Mutables, wideFlow, filterColumn, tupleType->GetElementsCount(), keys, std::move(keySerializers), std::move(aggsParams));
+ }
}
}