aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2022-10-21 21:43:48 +0300
committervvvv <vvvv@ydb.tech>2022-10-21 21:43:48 +0300
commit1f1966ecbf022c02cbfd0afbbdb3c6aafec6fbb3 (patch)
tree92ba4b0261123ee21ef2cd8eb966edd5d6967394
parent460b29a4016d8792c696cc3fc7151556b9e27acd (diff)
downloadydb-1f1966ecbf022c02cbfd0afbbdb3c6aafec6fbb3.tar.gz
[dq] high level rewrite for Length, fix for empty wide blocks
-rw-r--r--ydb/library/yql/core/yql_aggregate_expander.cpp2
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp118
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp8
3 files changed, 52 insertions, 76 deletions
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp
index e837e4de38..cfbcc4d5f9 100644
--- a/ydb/library/yql/core/yql_aggregate_expander.cpp
+++ b/ydb/library/yql/core/yql_aggregate_expander.cpp
@@ -564,7 +564,7 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAll() {
extractorRoots.push_back(root);
}
- outputColumns.push_back(TString(InitialColumnNames[index]->Content()));
+ outputColumns.push_back(TString(FinalColumnNames[index]->Content()));
}
auto mappedWidth = extractorRoots.size();
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index 4bff46ae6b..48be6f7cd6 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -1655,88 +1655,64 @@ TExprBase DqRewriteLengthOfStageOutputLegacy(TExprBase node, TExprContext& ctx,
auto field = BuildAtom("_dq_agg_cnt", node.Pos(), ctx);
- auto combine = Build<TCoCombineByKey>(ctx, node.Pos())
- .Input(dqUnion)
- .PreMapLambda()
- .Args({"item"})
- .Body<TCoJust>()
- .Input("item")
+ auto aggregateCombine = Build<TCoAggregateCombine>(ctx, node.Pos())
+ .Input(dqUnion)
+ .Keys()
+ .Build()
+ .Handlers()
+ .Add<TCoAggregateTuple>()
+ .ColumnName(field)
+ .Trait<TCoAggApply>()
+ .Name<TCoAtom>()
+ .Value("count_all")
.Build()
- .Build()
- .KeySelectorLambda()
- .Args({"item"})
- .Body(zero)
- .Build()
- .InitHandlerLambda()
- .Args({"key", "item"})
- .Body<TCoUint64>()
- .Literal().Build("1")
- .Build()
- .Build()
- .UpdateHandlerLambda()
- .Args({"key", "item", "state"})
- .Body<TCoInc>()
- .Value("state")
- .Build()
- .Build()
- .FinishHandlerLambda()
- .Args({"key", "state"})
- .Body<TCoJust>()
- .Input<TCoAsStruct>()
- .Add<TCoNameValueTuple>()
- .Name(field)
- .Value("state")
- .Build()
+ .InputType(ExpandType(node.Pos(), *GetSeqItemType(dqUnion.Raw()->GetTypeAnn()), ctx))
+ .Extractor<TCoLambda>()
+ .Args({ "row" })
+ .Body<TCoVoid>()
.Build()
.Build()
.Build()
- .Done();
+ .Build()
+ .Build()
+ .Settings()
+ .Build()
+ .Done();
- const auto stub = MakeBool<false>(node.Pos(), ctx);
+ TVector<const TItemExprType*> stateItems = {
+ ctx.MakeType<TItemExprType>("_dq_agg_cnt", ctx.MakeType<TDataExprType>(EDataSlot::Uint64))
+ };
- auto partition = Build<TCoPartitionsByKeys>(ctx, node.Pos())
- .Input(combine)
- .KeySelectorLambda()
- .Args({"item"})
- .Body(stub)
- .Build()
- .SortDirections<TCoVoid>()
- .Build()
- .SortKeySelectorLambda<TCoVoid>()
- .Build()
- .ListHandlerLambda()
- .Args({"list"})
- .Body<TCoCondense1>()
- .Input("list")
- .InitHandler(BuildIdentityLambda(node.Pos(), ctx)) // take struct from CombineByKey result
- .SwitchHandler()
- .Args({"item", "state"})
- .Body(stub)
- .Build()
- .UpdateHandler()
- .Args({"item", "state"})
- .Body<TCoAsStruct>()
- .Add<TCoNameValueTuple>()
- .Name(field)
- .Value<TCoAggrAdd>()
- .Left<TCoMember>()
- .Struct("state")
- .Name(field)
- .Build()
- .Right<TCoMember>()
- .Struct("item")
- .Name(field)
- .Build()
- .Build()
- .Build()
- .Build()
+ auto stateRowType = ctx.MakeType<TStructExprType>(stateItems);
+
+ auto aggregateFinal = Build<TCoAggregateMergeFinalize>(ctx, node.Pos())
+ .Input(aggregateCombine)
+ .Keys()
+ .Build()
+ .Handlers()
+ .Add<TCoAggregateTuple>()
+ .ColumnName(field)
+ .Trait<TCoAggApplyState>()
+ .Name<TCoAtom>()
+ .Value("count_all")
+ .Build()
+ .InputType(ExpandType(node.Pos(), *stateRowType, ctx))
+ .Extractor<TCoLambda>()
+ .Args({ "row" })
+ .Body<TCoMember>()
+ .Struct("row")
+ .Name(field)
.Build()
.Build()
.Build()
- .Done();
+ .Build()
+ .Build()
+ .Settings()
+ .Build()
+ .Done();
auto toOptional = Build<TCoToOptional>(ctx, node.Pos())
- .List(partition)
+ .List(aggregateFinal)
.Done();
auto coalesce = Build<TCoCoalesce>(ctx, node.Pos())
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
index 52c414a77f..e5910b7c4e 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
@@ -17,6 +17,8 @@ namespace NMiniKQL {
namespace {
+constexpr size_t MaxBlockSizeInBytes = 1_MB;
+
class TBlockBuilderBase {
public:
TBlockBuilderBase(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType)
@@ -40,8 +42,6 @@ private:
return arrow::BitUtil::BytesForBits(bits);
}
- static constexpr size_t MaxBlockSizeInBytes = 1_MB;
-
protected:
TComputationContext& Ctx_;
const std::shared_ptr<arrow::DataType> ItemType_;
@@ -222,8 +222,8 @@ private:
Builders_.push_back(MakeBlockBuilder(ctx, slots[i]));
}
- MaxLength_ = Builders_.front()->MaxLength();
- for (size_t i = 1; i < slots.size(); ++i) {
+ MaxLength_ = MaxBlockSizeInBytes;
+ for (size_t i = 0; i < slots.size(); ++i) {
MaxLength_ = Min(MaxLength_, Builders_[i]->MaxLength());
}
}