diff options
author | vokayndzop <[email protected]> | 2025-08-14 12:16:50 +0300 |
---|---|---|
committer | vokayndzop <[email protected]> | 2025-08-14 13:10:46 +0300 |
commit | 618abcfd3763bc44abbc9ef934a8b1d71dc9c674 (patch) | |
tree | 272ad8a5da9f5603ab59d41fcfcd93c7a5963e95 /yql/essentials/sql/v1/builtin.cpp | |
parent | bf42f3c2a0fe15d25acb32dd153f19102e205e0a (diff) |
Watermarks: add time extractor to HoppingWindow
commit_hash:d54496a7e3d9bd72e53ce880691a59490a34a138
Diffstat (limited to 'yql/essentials/sql/v1/builtin.cpp')
-rw-r--r-- | yql/essentials/sql/v1/builtin.cpp | 109 |
1 files changed, 68 insertions, 41 deletions
diff --git a/yql/essentials/sql/v1/builtin.cpp b/yql/essentials/sql/v1/builtin.cpp index d2a368be60f..c660d35817c 100644 --- a/yql/essentials/sql/v1/builtin.cpp +++ b/yql/essentials/sql/v1/builtin.cpp @@ -2152,30 +2152,35 @@ private: TNodePtr Node_; }; -THoppingWindow::THoppingWindow(TPosition pos, const TVector<TNodePtr>& args) +THoppingWindow::THoppingWindow(TPosition pos, TVector<TNodePtr> args) : INode(pos) - , Args_(args) + , Args_(std::move(args)) , FakeSource_(BuildFakeSource(pos)) , Valid_(false) {} -void THoppingWindow::MarkValid() { - YQL_ENSURE(!HasState(ENodeState::Initialized)); - Valid_ = true; -} - TNodePtr THoppingWindow::BuildTraits(const TString& label) const { YQL_ENSURE(HasState(ENodeState::Initialized)); return Y( "HoppingTraits", Y("ListItemType", Y("TypeOf", label)), - BuildLambda(Pos_, Y("row"), Y("Just", Y("SystemMetadata", Y("String", Q("write_time")), Y("DependsOn", "row")))), - Hop, - Interval, - Interval, - Q("true"), - Q("v2")); + BuildLambda(Pos_, Y("row"), TimeExtractor_), + Hop_, + Interval_, + Delay_, + Q(DataWatermarks_), + Q("v2") + ); +} + +TNodePtr THoppingWindow::GetInterval() const { + return Interval_; +} + +void THoppingWindow::MarkValid() { + YQL_ENSURE(!HasState(ENodeState::Initialized)); + Valid_ = true; } bool THoppingWindow::DoInit(TContext& ctx, ISource* src) { @@ -2184,8 +2189,8 @@ bool THoppingWindow::DoInit(TContext& ctx, ISource* src) { return false; } - if (!(Args_.size() == 2)) { - ctx.Error(Pos_) << "HoppingWindow requires two arguments"; + if (Args_.size() != 3) { + ctx.Error(Pos_) << "HoppingWindow requires three arguments"; return false; } @@ -2194,14 +2199,19 @@ bool THoppingWindow::DoInit(TContext& ctx, ISource* src) { return false; } - auto hopExpr = Args_[0]; - auto intervalExpr = Args_[1]; - if (!(hopExpr->Init(ctx, FakeSource_.Get()) && intervalExpr->Init(ctx, FakeSource_.Get()))) { + auto timeExtractor = Args_[0]; + auto hopExpr = Args_[1]; + auto intervalExpr = Args_[2]; + + if (!timeExtractor->Init(ctx, src) || + !hopExpr->Init(ctx, FakeSource_.Get()) || + !intervalExpr->Init(ctx, FakeSource_.Get())) { return false; } - Hop = ProcessIntervalParam(hopExpr); - Interval = ProcessIntervalParam(intervalExpr); + TimeExtractor_ = timeExtractor; + Hop_ = ProcessIntervalParam(hopExpr); + Interval_ = ProcessIntervalParam(intervalExpr); return true; } @@ -2613,54 +2623,71 @@ private: TString Mode_; }; -template <bool IsStart> -class THoppingTime final: public TAstListNode { +template<bool IsStart> +class THoppingTime final : public TAstListNode { public: - THoppingTime(TPosition pos, const TVector<TNodePtr>& args = {}) + THoppingTime(TPosition pos, TVector<TNodePtr> args) : TAstListNode(pos) - { - Y_UNUSED(args); - } + , Args_(std::move(args)) + {} private: - TNodePtr DoClone() const override { - return new THoppingTime(GetPos()); - } - bool DoInit(TContext& ctx, ISource* src) override { - Y_UNUSED(ctx); + if (!src || src->IsFake()) { + ctx.Error(Pos_) << GetOpName() << " requires data source"; + return false; + } + + if (Args_.size() > 0) { + ctx.Error(Pos_) << GetOpName() << " requires exactly 0 arguments"; + return false; + } auto legacySpec = src->GetLegacyHoppingWindowSpec(); auto spec = src->GetHoppingWindowSpec(); if (!legacySpec && !spec) { - ctx.Error(Pos_) << "No hopping window parameters in aggregation"; + if (src->HasAggregations()) { + ctx.Error(Pos_) << GetOpName() << " can not be used here: HoppingWindow specification is missing in GROUP BY"; + } else { + ctx.Error(Pos_) << GetOpName() << " can not be used without aggregation by HoppingWindow"; + } return false; } - Nodes_.clear(); - const auto fieldName = legacySpec ? "_yql_time" : spec->GetLabel(); - const auto interval = legacySpec + if constexpr (IsStart) { + const auto interval = legacySpec ? legacySpec->Interval - : dynamic_cast<THoppingWindow*>(spec.Get())->Interval; + : dynamic_cast<THoppingWindow*>(spec.Get())->GetInterval(); - if (!IsStart) { + Add("Sub", + Y("Member", "row", Q(fieldName)), + interval + ); + } else { Add("Member", "row", Q(fieldName)); - return true; } - Add("Sub", - Y("Member", "row", Q(fieldName)), - interval); return true; } void DoUpdateState() const override { State_.Set(ENodeState::Aggregated, true); } + + TNodePtr DoClone() const override { + return new THoppingTime<IsStart>(Pos_, CloneContainer(Args_)); + } + + TString GetOpName() const override { + return IsStart ? "HopStart" : "HopEnd"; + } + +private: + TVector<TNodePtr> Args_; }; class TInvalidBuiltin final: public INode { |