summaryrefslogtreecommitdiffstats
path: root/yql/essentials/sql/v1/aggregation.cpp
diff options
context:
space:
mode:
authormrlolthe1st <[email protected]>2025-11-02 02:09:37 +0300
committermrlolthe1st <[email protected]>2025-11-02 02:36:21 +0300
commit6679d28ead990c2d8e8178de71d57d6ea4566d2b (patch)
tree792809818b846d36aa878ae3b097a78a62dc09d0 /yql/essentials/sql/v1/aggregation.cpp
parent40582b2f612c6c599400f59676f27da4704136b8 (diff)
YQL-19673: Realization
commit_hash:68fcce49bf7355e49569a96ff7ad2faa537c563e
Diffstat (limited to 'yql/essentials/sql/v1/aggregation.cpp')
-rw-r--r--yql/essentials/sql/v1/aggregation.cpp81
1 files changed, 79 insertions, 2 deletions
diff --git a/yql/essentials/sql/v1/aggregation.cpp b/yql/essentials/sql/v1/aggregation.cpp
index e7a9d2a1fe9..fa8e5e6148f 100644
--- a/yql/essentials/sql/v1/aggregation.cpp
+++ b/yql/essentials/sql/v1/aggregation.cpp
@@ -50,8 +50,7 @@ public:
bool multi = false, bool validateArgs = true)
: IAggregation(pos, name, func, aggMode)
, Factory_(!func.empty() ? BuildBind(Pos_, aggMode == EAggregateMode::OverWindow || aggMode == EAggregateMode::OverWindowDistinct ? "window_module" : "aggregate_module", func) : nullptr)
- ,
- Multi_(multi)
+ , Multi_(multi)
, ValidateArgs_(validateArgs)
, DynamicFactory_(!Factory_)
{
@@ -1140,6 +1139,84 @@ TAggregationPtr BuildTopFactoryAggregation(TPosition pos, const TString& name, c
template TAggregationPtr BuildTopFactoryAggregation<false>(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode);
template TAggregationPtr BuildTopFactoryAggregation<true>(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode);
+class TReservoirSamplingAggregationFactory final: public TAggregationFactory {
+public:
+ TReservoirSamplingAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode, bool isValue)
+ : TAggregationFactory(pos, name, factory, aggMode)
+ , FakeSource_(BuildFakeSource(pos))
+ , IsValue_(isValue)
+ {
+ }
+
+private:
+ bool InitAggr(TContext& ctx, bool isFactory, ISource* src, TAstListNode& node, const TVector<TNodePtr>& exprs) final {
+ ui32 adjustArgsCount = (isFactory ? 0 : 1) + !IsValue_;
+ if (exprs.size() != adjustArgsCount) {
+ ctx.Error(Pos_) << "Reservoir Samplig aggregation " << (isFactory ? "factory " : "") << " function requires exactly " << adjustArgsCount << " arguments, given: " << exprs.size();
+ return false;
+ }
+
+ if (BlockWindowAggregationWithoutFrameSpec(Pos_, GetName(), src, ctx)) {
+ return false;
+ }
+
+ Limit_ = nullptr;
+ if (!IsValue_) {
+ auto limitArgPos = exprs[1]->GetPos();
+ Limit_ = exprs[1];
+ if (!Limit_->Init(ctx, FakeSource_.Get())) {
+ return false;
+ }
+ }
+
+ if (!isFactory) {
+ Expr_ = exprs[0];
+ Name_ = src->MakeLocalName(Name_);
+ }
+
+ if (!Init(ctx, src)) {
+ return false;
+ }
+
+ if (!isFactory) {
+ node.Add("Member", "row", Q(Name_));
+ if (IsOverWindow() || IsOverWindowDistinct()) {
+ src->AddTmpWindowColumn(Name_);
+ }
+ }
+
+ return true;
+ }
+
+ TNodePtr DoClone() const final {
+ return new TReservoirSamplingAggregationFactory(Pos_, Name_, Func_, AggMode_, IsValue_);
+ }
+
+ TNodePtr GetApply(const TNodePtr& type, bool many, bool allowAggApply, TContext& ctx) const final {
+ Y_UNUSED(ctx);
+ Y_UNUSED(allowAggApply);
+ auto apply = Y("Apply", Factory_, type, BuildLambda(Pos_, Y("row"), many ? Y("Unwrap", Expr_) : Expr_));
+ AddFactoryArguments(apply);
+ return apply;
+ }
+
+ void AddFactoryArguments(TNodePtr& apply) const final {
+ if (IsValue_) {
+ return;
+ }
+ apply = L(apply, Limit_);
+ }
+
+private:
+ TSourcePtr FakeSource_;
+ TNodePtr Limit_;
+ bool IsValue_;
+};
+
+TAggregationPtr BuildReservoirSamplingFactoryAggregation(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode, bool isValue) {
+ return new TReservoirSamplingAggregationFactory(pos, name, factory, aggMode, isValue);
+}
+
class TCountDistinctEstimateAggregationFactory final: public TAggregationFactory {
public:
TCountDistinctEstimateAggregationFactory(TPosition pos, const TString& name, const TString& factory, EAggregateMode aggMode)