diff options
author | aneporada <aneporada@ydb.tech> | 2023-10-13 14:49:43 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-10-13 16:03:39 +0300 |
commit | 1e5b83881360787c84f0973321b51eb75f31703f (patch) | |
tree | 8225bbf32e9c9e2036cda6a1dff5eb11eb3041e1 | |
parent | 1a13cbc3fbd37fbec7302bc89de85f093bf07c85 (diff) | |
download | ydb-1e5b83881360787c84f0973321b51eb75f31703f.tar.gz |
Remove column-only requirement in PERCENTILE/MEDIAN
9 files changed, 108 insertions, 57 deletions
diff --git a/ydb/docs/ru/core/yql/reference/yql-core/builtins/_includes/aggregation/percentile_median.md b/ydb/docs/ru/core/yql/reference/yql-core/builtins/_includes/aggregation/percentile_median.md index 4b54735a58..048cffd767 100644 --- a/ydb/docs/ru/core/yql/reference/yql-core/builtins/_includes/aggregation/percentile_median.md +++ b/ydb/docs/ru/core/yql/reference/yql-core/builtins/_includes/aggregation/percentile_median.md @@ -11,12 +11,6 @@ MEDIAN(Interval? [, Double])->Interval? Подсчет процентилей по амортизированной версии алгоритма [TDigest](https://github.com/tdunning/t-digest). `MEDIAN` — алиас для `PERCENTILE(N, 0.5)`. -{% note info "Ограничение" %} - -Первый аргумент (N) должен быть именем колонки таблицы. Если это ограничение необходимо обойти, можно использовать подзапрос. Ограничение введено для упрощения вычислений, поскольку в реализации несколько вызовов с одинаковым первым аргументом (N) склеиваются в один проход. - -{% endnote %} - ``` yql SELECT MEDIAN(numeric_column), diff --git a/ydb/library/yql/core/common_opt/yql_co_simple2.cpp b/ydb/library/yql/core/common_opt/yql_co_simple2.cpp index 07450fcb18..ae1b037119 100644 --- a/ydb/library/yql/core/common_opt/yql_co_simple2.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_simple2.cpp @@ -139,7 +139,6 @@ TExprNode::TPtr DeduplicateAggregateSameTraits(const TExprNode::TPtr& node, TExp .Build() .Value(); - YQL_CLOG(DEBUG, Core) << "Deduplicate " << node->Content() << " traits"; return ctx.Builder(self.Pos()) .Callable("Map") .Add(0, dedupedAggregate.Ptr()) @@ -208,6 +207,91 @@ TExprNode::TPtr DeduplicateAggregateSameTraits(const TExprNode::TPtr& node, TExp .Build(); } +TExprNode::TPtr MergeAggregateTraits(const TExprNode::TPtr& node, TExprContext& ctx) { + const TCoAggregate self(node); + using TMergeKey = std::pair<TExprNodeList, TStringBuf>; // all TCoAggregationTraits args (except finish), distinct column name + struct TCompareMergeKey { + bool operator()(const TMergeKey& a, const TMergeKey& b) const { + size_t len = std::min(a.first.size(), b.first.size()); + for (size_t i = 0; i < len; ++i) { + int cmp = CompareNodes(*a.first[i], *b.first[i]); + if (cmp) { + return cmp < 0; + } + } + if (a.first.size() != b.first.size()) { + return a.first.size() < b.first.size(); + } + return a.second < b.second; + } + }; + TExprNodeList resultAggTuples; + TMap<TMergeKey, TVector<TCoAggregateTuple>, TCompareMergeKey> tuplesByKey; + for (const auto& aggTuple : self.Handlers()) { + auto maybeAggTraits = aggTuple.Trait().Maybe<TCoAggregationTraits>(); + if (!maybeAggTraits || !maybeAggTraits.Cast().FinishHandler().Ref().IsComplete() || !aggTuple.ColumnName().Ref().IsAtom()) { + resultAggTuples.emplace_back(aggTuple.Ptr()); + continue; + } + + TExprNodeList aggTraits = maybeAggTraits.Cast().Ref().ChildrenList(); + YQL_ENSURE(aggTraits.size() > TCoAggregationTraits::idx_FinishHandler); + aggTraits.erase(aggTraits.begin() + TCoAggregationTraits::idx_FinishHandler); + + TStringBuf distinctKey; + if (aggTuple.DistinctName()) { + distinctKey = aggTuple.DistinctName().Cast().Value(); + } + + auto& tuples = tuplesByKey[TMergeKey{ std::move(aggTraits), distinctKey }]; + tuples.push_back(aggTuple); + } + + bool merged = false; + for (auto& it : tuplesByKey) { + auto& tuples = it.second; + if (tuples.size() == 1) { + resultAggTuples.push_back(tuples.front().Ptr()); + continue; + } + merged = true; + YQL_ENSURE(!tuples.empty()); + auto arg = ctx.NewArgument(tuples.front().Pos(), "arg"); + TExprNodeList bodyItems; + TExprNodeList columnNames; + + for (auto& tuple : tuples) { + bodyItems.push_back( + ctx.Builder(tuple.Trait().Cast<TCoAggregationTraits>().FinishHandler().Pos()) + .Apply(tuple.Trait().Cast<TCoAggregationTraits>().FinishHandler().Ref()) + .With(0, arg) + .Seal() + .Build() + ); + columnNames.push_back(tuple.ColumnName().Cast<TCoAtom>().Ptr()); + } + + auto newHandler = ctx.NewLambda(arg->Pos(), ctx.NewArguments(arg->Pos(), { arg }), ctx.NewList(arg->Pos(), std::move(bodyItems))); + auto newTraits = Build<TCoAggregationTraits>(ctx, tuples.front().Pos()) + .InitFrom(tuples.front().Trait().Cast<TCoAggregationTraits>()) + .FinishHandler(newHandler) + .Done().Ptr(); + auto newTuple = ctx.ChangeChild(tuples.front().Ref(), TCoAggregateTuple::idx_Trait, std::move(newTraits)); + newTuple = ctx.ChangeChild(*newTuple, TCoAggregateTuple::idx_ColumnName, ctx.NewList(tuples.front().Pos(), std::move(columnNames))); + resultAggTuples.push_back(std::move(newTuple)); + } + + if (!merged) { + return node; + } + + return Build<TCoAggregate>(ctx, node->Pos()) + .InitFrom(self) + .Handlers(ctx.NewList(self.Pos(), std::move(resultAggTuples))) + .Done() + .Ptr(); +} + TExprNode::TPtr SimplifySync(const TExprNode::TPtr& node, TExprContext& ctx) { TExprNode::TListType ordered; TNodeOnNodeOwnedMap realWorlds; @@ -480,7 +564,18 @@ void RegisterCoSimpleCallables2(TCallableOptimizerMap& map) { map["If"] = std::bind(&CheckIfWithSame, _1, _2); - map["Aggregate"] = std::bind(&DeduplicateAggregateSameTraits, _1, _2); + map["Aggregate"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext&) { + if (auto deduplicated = DeduplicateAggregateSameTraits(node, ctx); deduplicated != node) { + YQL_CLOG(DEBUG, Core) << "Deduplicate " << node->Content() << " traits"; + return deduplicated; + + } + if (auto merged = MergeAggregateTraits(node, ctx); merged != node) { + YQL_CLOG(DEBUG, Core) << "Merge aggregation traits in " << node->Content(); + return merged; + } + return node; + }; map["Xor"] = std::bind(&OptimizeXor, _1, _2); map["Not"] = std::bind(&OptimizeNot, _1, _2); diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp index f2ee7155a6..e489732ccb 100644 --- a/ydb/library/yql/core/type_ann/type_ann_list.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp @@ -4965,7 +4965,7 @@ namespace { rowColumns.push_back(ctx.Expr.MakeType<TItemExprType>( child->Head().Child(index)->Content(), (isOptional || (input->Child(1)->ChildrenSize() == 0 && !isHopping)) && - item->GetKind() != ETypeAnnotationKind::Optional ? ctx.Expr.MakeType<TOptionalExprType>(item) : item)); + !item->IsOptionalOrNull() ? ctx.Expr.MakeType<TOptionalExprType>(item) : item)); } } else { const TTypeAnnotationNode* defValType; diff --git a/ydb/library/yql/sql/v1/aggregation.cpp b/ydb/library/yql/sql/v1/aggregation.cpp index 5cc5540685..84b36d7991 100644 --- a/ydb/library/yql/sql/v1/aggregation.cpp +++ b/ydb/library/yql/sql/v1/aggregation.cpp @@ -709,19 +709,6 @@ public: {} private: - const TString* GetGenericKey() const final { - return Column; - } - - void Join(IAggregation* aggr) final { - const auto percentile = dynamic_cast<TPercentileFactory*>(aggr); - Y_ABORT_UNLESS(percentile); - Y_ABORT_UNLESS(*Column == *percentile->Column); - Y_ABORT_UNLESS(AggMode == percentile->AggMode); - Percentiles.insert(percentile->Percentiles.cbegin(), percentile->Percentiles.cend()); - percentile->Percentiles.clear(); - } - bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final { ui32 adjustArgsCount = isFactory ? 0 : 1; if (exprs.size() < 0 + adjustArgsCount || exprs.size() > 1 + adjustArgsCount) { @@ -730,14 +717,6 @@ private: return false; } - if (!isFactory) { - Column = exprs.front()->GetColumnName(); - if (!Column) { - ctx.Error(Pos) << Name << " may only be used with column reference as first argument."; - return false; - } - } - if (!TAggregationFactory::InitAggr(ctx, isFactory, src, node, isFactory ? TVector<TNodePtr>() : TVector<TNodePtr>(1, exprs.front()))) return false; @@ -827,7 +806,6 @@ private: TSourcePtr FakeSource; std::multimap<TString, TNodePtr> Percentiles; TNodePtr FactoryPercentile; - const TString* Column = nullptr; }; TAggregationPtr BuildPercentileFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) { diff --git a/ydb/library/yql/sql/v1/node.cpp b/ydb/library/yql/sql/v1/node.cpp index 8d3d996578..518e2a1b62 100644 --- a/ydb/library/yql/sql/v1/node.cpp +++ b/ydb/library/yql/sql/v1/node.cpp @@ -1284,14 +1284,6 @@ void IAggregation::DoUpdateState() const { State.Set(ENodeState::OverWindow, AggMode == EAggregateMode::OverWindow); } -const TString* IAggregation::GetGenericKey() const { - return nullptr; -} - -void IAggregation::Join(IAggregation*) { - Y_ABORT_UNLESS(false); -} - const TString& IAggregation::GetName() const { return Name; } diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h index 0c2b6bd141..7eca6e9671 100644 --- a/ydb/library/yql/sql/v1/node.h +++ b/ydb/library/yql/sql/v1/node.h @@ -796,8 +796,6 @@ namespace NSQLTranslationV1 { void DoUpdateState() const override; - virtual const TString* GetGenericKey() const; - virtual bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) = 0; virtual std::pair<TNodePtr, bool> AggregationTraits(const TNodePtr& type, bool overState, bool many, bool allowAggApply, TContext& ctx) const; @@ -814,9 +812,6 @@ namespace NSQLTranslationV1 { EAggregateMode GetAggregationMode() const; void MarkKeyColumnAsGenerated(); - - virtual void Join(IAggregation* aggr); - private: virtual TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const = 0; diff --git a/ydb/library/yql/sql/v1/source.cpp b/ydb/library/yql/sql/v1/source.cpp index bf5fbbe979..7a00842ba0 100644 --- a/ydb/library/yql/sql/v1/source.cpp +++ b/ydb/library/yql/sql/v1/source.cpp @@ -552,19 +552,6 @@ std::pair<TNodePtr, bool> ISource::BuildAggregation(const TString& label, TConte keysTuple = L(keysTuple, BuildQuotedAtom(Pos, key)); } - std::map<std::pair<bool, TString>, std::vector<IAggregation*>> genericAggrs; - for (const auto& aggr: Aggregations) { - if (const auto key = aggr->GetGenericKey()) { - genericAggrs[{aggr->IsDistinct(), *key}].emplace_back(aggr.Get()); - } - } - - for (const auto& aggr : genericAggrs) { - for (size_t i = 1U; i < aggr.second.size(); ++i) { - aggr.second.front()->Join(aggr.second[i]); - } - } - const auto listType = Y("TypeOf", label); auto aggrArgs = Y(); const bool overState = GroupBySuffix == "CombineState" || GroupBySuffix == "MergeState" || diff --git a/ydb/library/yql/tests/sql/suites/aggregate/percentiles_grouped_expr.cfg b/ydb/library/yql/tests/sql/suites/aggregate/percentiles_grouped_expr.cfg new file mode 100644 index 0000000000..b0152e1177 --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/aggregate/percentiles_grouped_expr.cfg @@ -0,0 +1,2 @@ +in Input percentiles_input.txt +udf stat_udf diff --git a/ydb/library/yql/tests/sql/suites/aggregate/percentiles_grouped_expr.sql b/ydb/library/yql/tests/sql/suites/aggregate/percentiles_grouped_expr.sql new file mode 100644 index 0000000000..1ad133feee --- /dev/null +++ b/ydb/library/yql/tests/sql/suites/aggregate/percentiles_grouped_expr.sql @@ -0,0 +1,8 @@ +select + median(val + 1) as med, + median(distinct val + 1) as distinct_med, + percentile(val + 1, 0.8) as p80 +from ( + select key, cast(value as int) as val from plato.Input +) +group by key order by med; |