summaryrefslogtreecommitdiffstats
path: root/yql/essentials/sql/v1/builtin.cpp
diff options
context:
space:
mode:
authorvokayndzop <[email protected]>2025-08-14 12:16:50 +0300
committervokayndzop <[email protected]>2025-08-14 13:10:46 +0300
commit618abcfd3763bc44abbc9ef934a8b1d71dc9c674 (patch)
tree272ad8a5da9f5603ab59d41fcfcd93c7a5963e95 /yql/essentials/sql/v1/builtin.cpp
parentbf42f3c2a0fe15d25acb32dd153f19102e205e0a (diff)
Watermarks: add time extractor to HoppingWindow
commit_hash:d54496a7e3d9bd72e53ce880691a59490a34a138
Diffstat (limited to 'yql/essentials/sql/v1/builtin.cpp')
-rw-r--r--yql/essentials/sql/v1/builtin.cpp109
1 files changed, 68 insertions, 41 deletions
diff --git a/yql/essentials/sql/v1/builtin.cpp b/yql/essentials/sql/v1/builtin.cpp
index d2a368be60f..c660d35817c 100644
--- a/yql/essentials/sql/v1/builtin.cpp
+++ b/yql/essentials/sql/v1/builtin.cpp
@@ -2152,30 +2152,35 @@ private:
TNodePtr Node_;
};
-THoppingWindow::THoppingWindow(TPosition pos, const TVector<TNodePtr>& args)
+THoppingWindow::THoppingWindow(TPosition pos, TVector<TNodePtr> args)
: INode(pos)
- , Args_(args)
+ , Args_(std::move(args))
, FakeSource_(BuildFakeSource(pos))
, Valid_(false)
{}
-void THoppingWindow::MarkValid() {
- YQL_ENSURE(!HasState(ENodeState::Initialized));
- Valid_ = true;
-}
-
TNodePtr THoppingWindow::BuildTraits(const TString& label) const {
YQL_ENSURE(HasState(ENodeState::Initialized));
return Y(
"HoppingTraits",
Y("ListItemType", Y("TypeOf", label)),
- BuildLambda(Pos_, Y("row"), Y("Just", Y("SystemMetadata", Y("String", Q("write_time")), Y("DependsOn", "row")))),
- Hop,
- Interval,
- Interval,
- Q("true"),
- Q("v2"));
+ BuildLambda(Pos_, Y("row"), TimeExtractor_),
+ Hop_,
+ Interval_,
+ Delay_,
+ Q(DataWatermarks_),
+ Q("v2")
+ );
+}
+
+TNodePtr THoppingWindow::GetInterval() const {
+ return Interval_;
+}
+
+void THoppingWindow::MarkValid() {
+ YQL_ENSURE(!HasState(ENodeState::Initialized));
+ Valid_ = true;
}
bool THoppingWindow::DoInit(TContext& ctx, ISource* src) {
@@ -2184,8 +2189,8 @@ bool THoppingWindow::DoInit(TContext& ctx, ISource* src) {
return false;
}
- if (!(Args_.size() == 2)) {
- ctx.Error(Pos_) << "HoppingWindow requires two arguments";
+ if (Args_.size() != 3) {
+ ctx.Error(Pos_) << "HoppingWindow requires three arguments";
return false;
}
@@ -2194,14 +2199,19 @@ bool THoppingWindow::DoInit(TContext& ctx, ISource* src) {
return false;
}
- auto hopExpr = Args_[0];
- auto intervalExpr = Args_[1];
- if (!(hopExpr->Init(ctx, FakeSource_.Get()) && intervalExpr->Init(ctx, FakeSource_.Get()))) {
+ auto timeExtractor = Args_[0];
+ auto hopExpr = Args_[1];
+ auto intervalExpr = Args_[2];
+
+ if (!timeExtractor->Init(ctx, src) ||
+ !hopExpr->Init(ctx, FakeSource_.Get()) ||
+ !intervalExpr->Init(ctx, FakeSource_.Get())) {
return false;
}
- Hop = ProcessIntervalParam(hopExpr);
- Interval = ProcessIntervalParam(intervalExpr);
+ TimeExtractor_ = timeExtractor;
+ Hop_ = ProcessIntervalParam(hopExpr);
+ Interval_ = ProcessIntervalParam(intervalExpr);
return true;
}
@@ -2613,54 +2623,71 @@ private:
TString Mode_;
};
-template <bool IsStart>
-class THoppingTime final: public TAstListNode {
+template<bool IsStart>
+class THoppingTime final : public TAstListNode {
public:
- THoppingTime(TPosition pos, const TVector<TNodePtr>& args = {})
+ THoppingTime(TPosition pos, TVector<TNodePtr> args)
: TAstListNode(pos)
- {
- Y_UNUSED(args);
- }
+ , Args_(std::move(args))
+ {}
private:
- TNodePtr DoClone() const override {
- return new THoppingTime(GetPos());
- }
-
bool DoInit(TContext& ctx, ISource* src) override {
- Y_UNUSED(ctx);
+ if (!src || src->IsFake()) {
+ ctx.Error(Pos_) << GetOpName() << " requires data source";
+ return false;
+ }
+
+ if (Args_.size() > 0) {
+ ctx.Error(Pos_) << GetOpName() << " requires exactly 0 arguments";
+ return false;
+ }
auto legacySpec = src->GetLegacyHoppingWindowSpec();
auto spec = src->GetHoppingWindowSpec();
if (!legacySpec && !spec) {
- ctx.Error(Pos_) << "No hopping window parameters in aggregation";
+ if (src->HasAggregations()) {
+ ctx.Error(Pos_) << GetOpName() << " can not be used here: HoppingWindow specification is missing in GROUP BY";
+ } else {
+ ctx.Error(Pos_) << GetOpName() << " can not be used without aggregation by HoppingWindow";
+ }
return false;
}
- Nodes_.clear();
-
const auto fieldName = legacySpec
? "_yql_time"
: spec->GetLabel();
- const auto interval = legacySpec
+ if constexpr (IsStart) {
+ const auto interval = legacySpec
? legacySpec->Interval
- : dynamic_cast<THoppingWindow*>(spec.Get())->Interval;
+ : dynamic_cast<THoppingWindow*>(spec.Get())->GetInterval();
- if (!IsStart) {
+ Add("Sub",
+ Y("Member", "row", Q(fieldName)),
+ interval
+ );
+ } else {
Add("Member", "row", Q(fieldName));
- return true;
}
- Add("Sub",
- Y("Member", "row", Q(fieldName)),
- interval);
return true;
}
void DoUpdateState() const override {
State_.Set(ENodeState::Aggregated, true);
}
+
+ TNodePtr DoClone() const override {
+ return new THoppingTime<IsStart>(Pos_, CloneContainer(Args_));
+ }
+
+ TString GetOpName() const override {
+ return IsStart ? "HopStart" : "HopEnd";
+ }
+
+private:
+ TVector<TNodePtr> Args_;
};
class TInvalidBuiltin final: public INode {