aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2022-11-22 22:40:45 +0300
committervvvv <vvvv@ydb.tech>2022-11-22 22:40:45 +0300
commit56595c60783bd80390b4d410a80f60a7969e2a15 (patch)
treed7a0a05a3874eb73ef73a25cd99a39c3847c9e3c
parent8ce89cc8436ea7052bbed327149cd6be12987f53 (diff)
downloadydb-56595c60783bd80390b4d410a80f60a7969e2a15.tar.gz
pushdown of filter into BlockCombineAll, supported for count(*)
-rw-r--r--ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp60
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp16
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp11
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h2
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp9
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp9
6 files changed, 75 insertions, 32 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
index 15ec02217e6..5d419bf8f7a 100644
--- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -4859,6 +4859,34 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte
.Build();
}
+TExprNode::TPtr UpdateBlockCombineAllColumns(const TExprNode::TPtr& node, std::optional<ui32> filterColumn, const TVector<ui32>& argIndices, TExprContext& ctx) {
+ auto combineChildren = node->ChildrenList();
+ combineChildren[0] = node->Head().HeadPtr();
+ combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[1]->Content())]));
+ if (filterColumn) {
+ YQL_ENSURE(combineChildren[2]->IsCallable("Void"), "Filter column is already used");
+ combineChildren[2] = ctx.NewAtom(node->Pos(), ToString(*filterColumn));
+ } else {
+ if (!combineChildren[2]->IsCallable("Void")) {
+ combineChildren[2] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[2]->Content())]));
+ }
+ }
+
+ auto payloadNodes = combineChildren[3]->ChildrenList();
+ for (auto& p : payloadNodes) {
+ YQL_ENSURE(p->IsList() && p->ChildrenSize() >= 1 && p->Head().IsCallable("AggBlockApply"), "Expected AggBlockApply");
+ auto payloadArgs = p->ChildrenList();
+ for (ui32 i = 1; i < payloadArgs.size(); ++i) {
+ payloadArgs[i] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(payloadArgs[i]->Content())]));
+ }
+
+ p = ctx.ChangeChildren(*p, std::move(payloadArgs));
+ }
+
+ combineChildren[3] = ctx.ChangeChildren(*combineChildren[3], std::move(payloadNodes));
+ return ctx.ChangeChildren(*node, std::move(combineChildren));
+}
+
TExprNode::TPtr OptimizeBlockCombineAll(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
if (node->Head().IsCallable("WideMap")) {
@@ -4867,28 +4895,20 @@ TExprNode::TPtr OptimizeBlockCombineAll(const TExprNode::TPtr& node, TExprContex
bool onlyArguments = IsArgumentsOnlyLambda(lambda, argIndices);
if (onlyArguments) {
YQL_CLOG(DEBUG, CorePeepHole) << "Drop renaming WideMap under " << node->Content();
- // change # for all columns, including count/filter
- auto combineChildren = node->ChildrenList();
- combineChildren[0] = node->Head().HeadPtr();
- combineChildren[1] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[1]->Content())]));
- if (!combineChildren[2]->IsCallable("Void")) {
- combineChildren[2] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(combineChildren[2]->Content())]));
- }
-
- auto payloadNodes = combineChildren[3]->ChildrenList();
- for (auto& p : payloadNodes) {
- YQL_ENSURE(p->IsList() && p->ChildrenSize() >= 1 && p->Head().IsCallable("AggBlockApply"), "Expected AggBlockApply");
- auto payloadArgs = p->ChildrenList();
- for (ui32 i = 1; i < payloadArgs.size(); ++i) {
- payloadArgs[i] = ctx.NewAtom(node->Pos(), ToString(argIndices[FromString<ui32>(payloadArgs[i]->Content())]));
- }
-
- p = ctx.ChangeChildren(*p, std::move(payloadArgs));
- }
+ return UpdateBlockCombineAllColumns(node, {}, argIndices, ctx);
+ }
+ }
- combineChildren[3] = ctx.ChangeChildren(*combineChildren[3], std::move(payloadNodes));
- return ctx.ChangeChildren(*node, std::move(combineChildren));
+ if (node->Head().IsCallable("BlockCompress") && node->Child(2)->IsCallable("Void")) {
+ auto filterIndex = FromString<ui32>(node->Head().Child(1)->Content());
+ TVector<ui32> argIndices;
+ argIndices.resize(node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetSize());
+ for (ui32 i = 0; i < argIndices.size(); ++i) {
+ argIndices[i] = (i < filterIndex) ? i : i + 1;
}
+
+ YQL_CLOG(DEBUG, CorePeepHole) << "Fuse " << node->Content() << " with " << node->Head().Content();
+ return UpdateBlockCombineAllColumns(node, filterIndex, argIndices, ctx);
}
return node;
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
index 421b80dbcb1..f6acca37d35 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg.cpp
@@ -6,6 +6,8 @@
#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <arrow/array/array_primitive.h>
+
namespace NKikimr {
namespace NMiniKQL {
@@ -53,10 +55,21 @@ public:
continue;
}
+ std::optional<ui64> filtered;
+ if (FilterColumn_) {
+ arrow::BooleanArray arr(TArrowBlock::From(s.Values_[*FilterColumn_]).GetDatum().array());
+ ui64 popCount = (ui64)arr.true_count();
+ if (popCount == 0) {
+ continue;
+ }
+
+ filtered = popCount;
+ }
+
s.HasValues_ = true;
for (size_t i = 0; i < s.Aggs_.size(); ++i) {
if (output[i]) {
- s.Aggs_[i]->AddMany(s.Values_.data(), batchLength);
+ s.Aggs_[i]->AddMany(s.Values_.data(), batchLength, filtered);
}
}
} else {
@@ -142,7 +155,6 @@ IComputationNode* WrapBlockCombineAll(TCallable& callable, const TComputationNod
filterColumn = AS_VALUE(TDataLiteral, filterColumnVal->GetItem())->AsValue().Get<ui32>();
}
- MKQL_ENSURE(!filterColumn, "Filter column is not supported yet");
auto aggsVal = AS_VALUE(TTupleLiteral, callable.GetInput(3));
TVector<TAggParams> aggsParams;
for (ui32 i = 0; i < aggsVal->GetValuesCount(); ++i) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
index 169e12a6556..c985b182206 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
@@ -10,9 +10,13 @@ public:
{
}
- void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final {
+ void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
Y_UNUSED(columns);
- State_ += batchLength;
+ if (filtered) {
+ State_ += *filtered;
+ } else {
+ State_ += batchLength;
+ }
}
NUdf::TUnboxedValue Finish() final {
@@ -31,7 +35,8 @@ public:
{
}
- void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final {
+ void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
+ Y_ENSURE(!filtered);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
index a60b243e303..fc3a44270f4 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.h
@@ -10,7 +10,7 @@ class IBlockAggregator {
public:
virtual ~IBlockAggregator() = default;
- virtual void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) = 0;
+ virtual void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) = 0;
virtual NUdf::TUnboxedValue Finish() = 0;
};
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
index a0fde1f70d7..9cea6d1fbc7 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
@@ -33,7 +33,8 @@ public:
}
}
- void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final {
+ void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
+ Y_ENSURE(!filtered);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {
@@ -97,7 +98,8 @@ public:
}
}
- void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final {
+ void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
+ Y_ENSURE(!filtered);
Y_UNUSED(batchLength);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
MKQL_ENSURE(datum.is_array(), "Expected array");
@@ -137,7 +139,8 @@ public:
}
}
- void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final {
+ void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
+ Y_ENSURE(!filtered);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
index 2fe4f0d0d18..dbf0d5a5edb 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
@@ -19,7 +19,8 @@ public:
{
}
- void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final {
+ void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
+ Y_ENSURE(!filtered);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {
@@ -78,7 +79,8 @@ public:
{
}
- void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final {
+ void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
+ Y_ENSURE(!filtered);
Y_UNUSED(batchLength);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
MKQL_ENSURE(datum.is_array(), "Expected array");
@@ -115,7 +117,8 @@ public:
{
}
- void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength) final {
+ void AddMany(const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
+ Y_ENSURE(!filtered);
const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
if (datum.is_scalar()) {
if (datum.scalar()->is_valid) {