diff options
| author | mrlolthe1st <[email protected]> | 2025-11-02 02:09:37 +0300 |
|---|---|---|
| committer | mrlolthe1st <[email protected]> | 2025-11-02 02:36:21 +0300 |
| commit | 6679d28ead990c2d8e8178de71d57d6ea4566d2b (patch) | |
| tree | 792809818b846d36aa878ae3b097a78a62dc09d0 /yql/essentials/sql/v1/aggregation.cpp | |
| parent | 40582b2f612c6c599400f59676f27da4704136b8 (diff) | |
YQL-19673: Realization
commit_hash:68fcce49bf7355e49569a96ff7ad2faa537c563e
Diffstat (limited to 'yql/essentials/sql/v1/aggregation.cpp')
| -rw-r--r-- | yql/essentials/sql/v1/aggregation.cpp | 81 |
1 files changed, 79 insertions, 2 deletions
diff --git a/yql/essentials/sql/v1/aggregation.cpp b/yql/essentials/sql/v1/aggregation.cpp index e7a9d2a1fe9..fa8e5e6148f 100644 --- a/yql/essentials/sql/v1/aggregation.cpp +++ b/yql/essentials/sql/v1/aggregation.cpp @@ -50,8 +50,7 @@ public: bool multi = false, bool validateArgs = true) : IAggregation(pos, name, func, aggMode) , Factory_(!func.empty() ? BuildBind(Pos_, aggMode == EAggregateMode::OverWindow || aggMode == EAggregateMode::OverWindowDistinct ? "window_module" : "aggregate_module", func) : nullptr) - , - Multi_(multi) + , Multi_(multi) , ValidateArgs_(validateArgs) , DynamicFactory_(!Factory_) { @@ -1140,6 +1139,84 @@ TAggregationPtr BuildTopFactoryAggregation(TPosition pos, const TString& name, c template TAggregationPtr BuildTopFactoryAggregation<false>(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode); template TAggregationPtr BuildTopFactoryAggregation<true>(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode); +class TReservoirSamplingAggregationFactory final: public TAggregationFactory { +public: + TReservoirSamplingAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode, bool isValue) + : TAggregationFactory(pos, name, factory, aggMode) + , FakeSource_(BuildFakeSource(pos)) + , IsValue_(isValue) + { + } + +private: + bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final { + ui32 adjustArgsCount = (isFactory ? 0 : 1) + !IsValue_; + if (exprs.size() != adjustArgsCount) { + ctx.Error(Pos_) << "Reservoir Samplig aggregation " << (isFactory ? "factory " : "") << " function requires exactly " << adjustArgsCount << " arguments, given: " << exprs.size(); + return false; + } + + if (BlockWindowAggregationWithoutFrameSpec(Pos_, GetName(), src, ctx)) { + return false; + } + + Limit_ = nullptr; + if (!IsValue_) { + auto limitArgPos = exprs[1]->GetPos(); + Limit_ = exprs[1]; + if (!Limit_->Init(ctx, FakeSource_.Get())) { + return false; + } + } + + if (!isFactory) { + Expr_ = exprs[0]; + Name_ = src->MakeLocalName(Name_); + } + + if (!Init(ctx, src)) { + return false; + } + + if (!isFactory) { + node.Add("Member", "row", Q(Name_)); + if (IsOverWindow() || IsOverWindowDistinct()) { + src->AddTmpWindowColumn(Name_); + } + } + + return true; + } + + TNodePtr DoClone() const final { + return new TReservoirSamplingAggregationFactory(Pos_, Name_, Func_, AggMode_, IsValue_); + } + + TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final { + Y_UNUSED(ctx); + Y_UNUSED(allowAggApply); + auto apply = Y("Apply", Factory_, type, BuildLambda(Pos_, Y("row"), many ? Y("Unwrap", Expr_) : Expr_)); + AddFactoryArguments(apply); + return apply; + } + + void AddFactoryArguments(TNodePtr& apply) const final { + if (IsValue_) { + return; + } + apply = L(apply, Limit_); + } + +private: + TSourcePtr FakeSource_; + TNodePtr Limit_; + bool IsValue_; +}; + +TAggregationPtr BuildReservoirSamplingFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode, bool isValue) { + return new TReservoirSamplingAggregationFactory(pos, name, factory, aggMode, isValue); +} + class TCountDistinctEstimateAggregationFactory final: public TAggregationFactory { public: TCountDistinctEstimateAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode) |
