diff options
author | vvvv <vvvv@ydb.tech> | 2022-10-21 21:43:48 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-10-21 21:43:48 +0300 |
commit | 1f1966ecbf022c02cbfd0afbbdb3c6aafec6fbb3 (patch) | |
tree | 92ba4b0261123ee21ef2cd8eb966edd5d6967394 | |
parent | 460b29a4016d8792c696cc3fc7151556b9e27acd (diff) | |
download | ydb-1f1966ecbf022c02cbfd0afbbdb3c6aafec6fbb3.tar.gz |
[dq] high level rewrite for Length, fix for empty wide blocks
-rw-r--r-- | ydb/library/yql/core/yql_aggregate_expander.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 118 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp | 8 |
3 files changed, 52 insertions, 76 deletions
diff --git a/ydb/library/yql/core/yql_aggregate_expander.cpp b/ydb/library/yql/core/yql_aggregate_expander.cpp index e837e4de38..cfbcc4d5f9 100644 --- a/ydb/library/yql/core/yql_aggregate_expander.cpp +++ b/ydb/library/yql/core/yql_aggregate_expander.cpp @@ -564,7 +564,7 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAll() { extractorRoots.push_back(root); } - outputColumns.push_back(TString(InitialColumnNames[index]->Content())); + outputColumns.push_back(TString(FinalColumnNames[index]->Content())); } auto mappedWidth = extractorRoots.size(); diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 4bff46ae6b..48be6f7cd6 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -1655,88 +1655,64 @@ TExprBase DqRewriteLengthOfStageOutputLegacy(TExprBase node, TExprContext& ctx, auto field = BuildAtom("_dq_agg_cnt", node.Pos(), ctx); - auto combine = Build<TCoCombineByKey>(ctx, node.Pos()) - .Input(dqUnion) - .PreMapLambda() - .Args({"item"}) - .Body<TCoJust>() - .Input("item") + auto aggregateCombine = Build<TCoAggregateCombine>(ctx, node.Pos()) + .Input(dqUnion) + .Keys() + .Build() + .Handlers() + .Add<TCoAggregateTuple>() + .ColumnName(field) + .Trait<TCoAggApply>() + .Name<TCoAtom>() + .Value("count_all") .Build() - .Build() - .KeySelectorLambda() - .Args({"item"}) - .Body(zero) - .Build() - .InitHandlerLambda() - .Args({"key", "item"}) - .Body<TCoUint64>() - .Literal().Build("1") - .Build() - .Build() - .UpdateHandlerLambda() - .Args({"key", "item", "state"}) - .Body<TCoInc>() - .Value("state") - .Build() - .Build() - .FinishHandlerLambda() - .Args({"key", "state"}) - .Body<TCoJust>() - .Input<TCoAsStruct>() - .Add<TCoNameValueTuple>() - .Name(field) - .Value("state") - .Build() + .InputType(ExpandType(node.Pos(), *GetSeqItemType(dqUnion.Raw()->GetTypeAnn()), ctx)) + .Extractor<TCoLambda>() + .Args({ "row" }) + .Body<TCoVoid>() .Build() .Build() .Build() - .Done(); + .Build() + .Build() + .Settings() + .Build() + .Done(); - const auto stub = MakeBool<false>(node.Pos(), ctx); + TVector<const TItemExprType*> stateItems = { + ctx.MakeType<TItemExprType>("_dq_agg_cnt", ctx.MakeType<TDataExprType>(EDataSlot::Uint64)) + }; - auto partition = Build<TCoPartitionsByKeys>(ctx, node.Pos()) - .Input(combine) - .KeySelectorLambda() - .Args({"item"}) - .Body(stub) - .Build() - .SortDirections<TCoVoid>() - .Build() - .SortKeySelectorLambda<TCoVoid>() - .Build() - .ListHandlerLambda() - .Args({"list"}) - .Body<TCoCondense1>() - .Input("list") - .InitHandler(BuildIdentityLambda(node.Pos(), ctx)) // take struct from CombineByKey result - .SwitchHandler() - .Args({"item", "state"}) - .Body(stub) - .Build() - .UpdateHandler() - .Args({"item", "state"}) - .Body<TCoAsStruct>() - .Add<TCoNameValueTuple>() - .Name(field) - .Value<TCoAggrAdd>() - .Left<TCoMember>() - .Struct("state") - .Name(field) - .Build() - .Right<TCoMember>() - .Struct("item") - .Name(field) - .Build() - .Build() - .Build() - .Build() + auto stateRowType = ctx.MakeType<TStructExprType>(stateItems); + + auto aggregateFinal = Build<TCoAggregateMergeFinalize>(ctx, node.Pos()) + .Input(aggregateCombine) + .Keys() + .Build() + .Handlers() + .Add<TCoAggregateTuple>() + .ColumnName(field) + .Trait<TCoAggApplyState>() + .Name<TCoAtom>() + .Value("count_all") + .Build() + .InputType(ExpandType(node.Pos(), *stateRowType, ctx)) + .Extractor<TCoLambda>() + .Args({ "row" }) + .Body<TCoMember>() + .Struct("row") + .Name(field) .Build() .Build() .Build() - .Done(); + .Build() + .Build() + .Settings() + .Build() + .Done(); auto toOptional = Build<TCoToOptional>(ctx, node.Pos()) - .List(partition) + .List(aggregateFinal) .Done(); auto coalesce = Build<TCoCoalesce>(ctx, node.Pos()) diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index 52c414a77f..e5910b7c4e 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -17,6 +17,8 @@ namespace NMiniKQL { namespace { +constexpr size_t MaxBlockSizeInBytes = 1_MB; + class TBlockBuilderBase { public: TBlockBuilderBase(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType) @@ -40,8 +42,6 @@ private: return arrow::BitUtil::BytesForBits(bits); } - static constexpr size_t MaxBlockSizeInBytes = 1_MB; - protected: TComputationContext& Ctx_; const std::shared_ptr<arrow::DataType> ItemType_; @@ -222,8 +222,8 @@ private: Builders_.push_back(MakeBlockBuilder(ctx, slots[i])); } - MaxLength_ = Builders_.front()->MaxLength(); - for (size_t i = 1; i < slots.size(); ++i) { + MaxLength_ = MaxBlockSizeInBytes; + for (size_t i = 0; i < slots.size(); ++i) { MaxLength_ = Min(MaxLength_, Builders_[i]->MaxLength()); } } |