diff options
author | vokayndzop <[email protected]> | 2025-08-14 12:16:50 +0300 |
---|---|---|
committer | vokayndzop <[email protected]> | 2025-08-14 13:10:46 +0300 |
commit | 618abcfd3763bc44abbc9ef934a8b1d71dc9c674 (patch) | |
tree | 272ad8a5da9f5603ab59d41fcfcd93c7a5963e95 | |
parent | bf42f3c2a0fe15d25acb32dd153f19102e205e0a (diff) |
Watermarks: add time extractor to HoppingWindow
commit_hash:d54496a7e3d9bd72e53ce880691a59490a34a138
-rw-r--r-- | yql/essentials/sql/v1/builtin.cpp | 109 | ||||
-rw-r--r-- | yql/essentials/sql/v1/source.h | 22 | ||||
-rw-r--r-- | yql/essentials/sql/v1/sql_ut_common.h | 151 |
3 files changed, 232 insertions, 50 deletions
diff --git a/yql/essentials/sql/v1/builtin.cpp b/yql/essentials/sql/v1/builtin.cpp index d2a368be60f..c660d35817c 100644 --- a/yql/essentials/sql/v1/builtin.cpp +++ b/yql/essentials/sql/v1/builtin.cpp @@ -2152,30 +2152,35 @@ private: TNodePtr Node_; }; -THoppingWindow::THoppingWindow(TPosition pos, const TVector<TNodePtr>& args) +THoppingWindow::THoppingWindow(TPosition pos, TVector<TNodePtr> args) : INode(pos) - , Args_(args) + , Args_(std::move(args)) , FakeSource_(BuildFakeSource(pos)) , Valid_(false) {} -void THoppingWindow::MarkValid() { - YQL_ENSURE(!HasState(ENodeState::Initialized)); - Valid_ = true; -} - TNodePtr THoppingWindow::BuildTraits(const TString& label) const { YQL_ENSURE(HasState(ENodeState::Initialized)); return Y( "HoppingTraits", Y("ListItemType", Y("TypeOf", label)), - BuildLambda(Pos_, Y("row"), Y("Just", Y("SystemMetadata", Y("String", Q("write_time")), Y("DependsOn", "row")))), - Hop, - Interval, - Interval, - Q("true"), - Q("v2")); + BuildLambda(Pos_, Y("row"), TimeExtractor_), + Hop_, + Interval_, + Delay_, + Q(DataWatermarks_), + Q("v2") + ); +} + +TNodePtr THoppingWindow::GetInterval() const { + return Interval_; +} + +void THoppingWindow::MarkValid() { + YQL_ENSURE(!HasState(ENodeState::Initialized)); + Valid_ = true; } bool THoppingWindow::DoInit(TContext& ctx, ISource* src) { @@ -2184,8 +2189,8 @@ bool THoppingWindow::DoInit(TContext& ctx, ISource* src) { return false; } - if (!(Args_.size() == 2)) { - ctx.Error(Pos_) << "HoppingWindow requires two arguments"; + if (Args_.size() != 3) { + ctx.Error(Pos_) << "HoppingWindow requires three arguments"; return false; } @@ -2194,14 +2199,19 @@ bool THoppingWindow::DoInit(TContext& ctx, ISource* src) { return false; } - auto hopExpr = Args_[0]; - auto intervalExpr = Args_[1]; - if (!(hopExpr->Init(ctx, FakeSource_.Get()) && intervalExpr->Init(ctx, FakeSource_.Get()))) { + auto timeExtractor = Args_[0]; + auto hopExpr = Args_[1]; + auto intervalExpr = Args_[2]; + + if (!timeExtractor->Init(ctx, src) || + !hopExpr->Init(ctx, FakeSource_.Get()) || + !intervalExpr->Init(ctx, FakeSource_.Get())) { return false; } - Hop = ProcessIntervalParam(hopExpr); - Interval = ProcessIntervalParam(intervalExpr); + TimeExtractor_ = timeExtractor; + Hop_ = ProcessIntervalParam(hopExpr); + Interval_ = ProcessIntervalParam(intervalExpr); return true; } @@ -2613,54 +2623,71 @@ private: TString Mode_; }; -template <bool IsStart> -class THoppingTime final: public TAstListNode { +template<bool IsStart> +class THoppingTime final : public TAstListNode { public: - THoppingTime(TPosition pos, const TVector<TNodePtr>& args = {}) + THoppingTime(TPosition pos, TVector<TNodePtr> args) : TAstListNode(pos) - { - Y_UNUSED(args); - } + , Args_(std::move(args)) + {} private: - TNodePtr DoClone() const override { - return new THoppingTime(GetPos()); - } - bool DoInit(TContext& ctx, ISource* src) override { - Y_UNUSED(ctx); + if (!src || src->IsFake()) { + ctx.Error(Pos_) << GetOpName() << " requires data source"; + return false; + } + + if (Args_.size() > 0) { + ctx.Error(Pos_) << GetOpName() << " requires exactly 0 arguments"; + return false; + } auto legacySpec = src->GetLegacyHoppingWindowSpec(); auto spec = src->GetHoppingWindowSpec(); if (!legacySpec && !spec) { - ctx.Error(Pos_) << "No hopping window parameters in aggregation"; + if (src->HasAggregations()) { + ctx.Error(Pos_) << GetOpName() << " can not be used here: HoppingWindow specification is missing in GROUP BY"; + } else { + ctx.Error(Pos_) << GetOpName() << " can not be used without aggregation by HoppingWindow"; + } return false; } - Nodes_.clear(); - const auto fieldName = legacySpec ? "_yql_time" : spec->GetLabel(); - const auto interval = legacySpec + if constexpr (IsStart) { + const auto interval = legacySpec ? legacySpec->Interval - : dynamic_cast<THoppingWindow*>(spec.Get())->Interval; + : dynamic_cast<THoppingWindow*>(spec.Get())->GetInterval(); - if (!IsStart) { + Add("Sub", + Y("Member", "row", Q(fieldName)), + interval + ); + } else { Add("Member", "row", Q(fieldName)); - return true; } - Add("Sub", - Y("Member", "row", Q(fieldName)), - interval); return true; } void DoUpdateState() const override { State_.Set(ENodeState::Aggregated, true); } + + TNodePtr DoClone() const override { + return new THoppingTime<IsStart>(Pos_, CloneContainer(Args_)); + } + + TString GetOpName() const override { + return IsStart ? "HopStart" : "HopEnd"; + } + +private: + TVector<TNodePtr> Args_; }; class TInvalidBuiltin final: public INode { diff --git a/yql/essentials/sql/v1/source.h b/yql/essentials/sql/v1/source.h index 9a7a1fa8540..8aaadd2b840 100644 --- a/yql/essentials/sql/v1/source.h +++ b/yql/essentials/sql/v1/source.h @@ -210,12 +210,11 @@ namespace NSQLTranslationV1 { class THoppingWindow final : public INode { public: - THoppingWindow(TPosition pos, const TVector<TNodePtr>& args); - void MarkValid(); + THoppingWindow(TPosition pos, TVector<TNodePtr> args); TNodePtr BuildTraits(const TString& label) const; - public: - TNodePtr Hop; - TNodePtr Interval; + TNodePtr GetInterval() const; + void MarkValid(); + private: bool DoInit(TContext& ctx, ISource* src) override; TAstNode* Translate(TContext&) const override; @@ -224,9 +223,14 @@ namespace NSQLTranslationV1 { TString GetOpName() const override; TNodePtr ProcessIntervalParam(const TNodePtr& val) const; + private: TVector<TNodePtr> Args_; TSourcePtr FakeSource_; - TNodePtr Node_; + TNodePtr TimeExtractor_; + TNodePtr Hop_; + TNodePtr Interval_; + const TNodePtr Delay_ = Y("Interval", Q("0")); + const TString DataWatermarks_ = "true"; bool Valid_; }; @@ -287,13 +291,13 @@ namespace NSQLTranslationV1 { TVector<TNodePtr>&& keys, TVector<TNodePtr>&& args, TNodePtr udf, TNodePtr having, const TWriteSettings& settings, const TVector<TSortSpecificationPtr>& assumeOrderBy, bool listCall); TSourcePtr BuildProcess(TPosition pos, TSourcePtr source, TNodePtr with, bool withExtFunction, TVector<TNodePtr>&& terms, bool listCall, - bool prcessStream, const TWriteSettings& settings, const TVector<TSortSpecificationPtr>& assumeOrderBy); + bool processStream, const TWriteSettings& settings, const TVector<TSortSpecificationPtr>& assumeOrderBy); TNodePtr BuildSelectResult(TPosition pos, TSourcePtr source, bool writeResult, bool inSubquery, TScopedStatePtr scoped); // Implemented in insert.cpp - TSourcePtr BuildWriteValues(TPosition pos, const TString& opertationHumanName, const TVector<TString>& columnsHint, const TVector<TVector<TNodePtr>>& values); - TSourcePtr BuildWriteValues(TPosition pos, const TString& opertationHumanName, const TVector<TString>& columnsHint, TSourcePtr source); + TSourcePtr BuildWriteValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, const TVector<TVector<TNodePtr>>& values); + TSourcePtr BuildWriteValues(TPosition pos, const TString& operationHumanName, const TVector<TString>& columnsHint, TSourcePtr source); TSourcePtr BuildUpdateValues(TPosition pos, const TVector<TString>& columnsHint, const TVector<TNodePtr>& values); EWriteColumnMode ToWriteColumnsMode(ESQLWriteColumnMode sqlWriteColumnMode); diff --git a/yql/essentials/sql/v1/sql_ut_common.h b/yql/essentials/sql/v1/sql_ut_common.h index 95bd2ce5e40..46d01395bec 100644 --- a/yql/essentials/sql/v1/sql_ut_common.h +++ b/yql/essentials/sql/v1/sql_ut_common.h @@ -9045,3 +9045,154 @@ WITH( UNIT_ASSERT(res.IsOk()); } } + +Y_UNIT_TEST_SUITE(HoppingWindow) { + Y_UNIT_TEST(HoppingWindow) { + auto query = R"sql( + SELECT + * + FROM plato.Input + GROUP BY HoppingWindow(key, 39, 42); + )sql"; + + NYql::TAstParseResult res = SqlToYql(query); + UNIT_ASSERT_VALUES_UNEQUAL(nullptr, res.Root); + UNIT_ASSERT(res.IsOk()); + UNIT_ASSERT_VALUES_EQUAL(0, res.Issues.Size()); + } + + Y_UNIT_TEST(HoppingWindowWithoutSource) { + ExpectFailWithError( + R"sql(SELECT 1 + HoppingWindow(key, 39, 42);)sql", + "<main>:1:12: Error: HoppingWindow requires data source\n" + ); + } + + Y_UNIT_TEST(HoppingWindowInProjection) { + ExpectFailWithError( + R"sql(SELECT 1 + HoppingWindow(key, 39, 42) FROM plato.Input;)sql", + "<main>:1:12: Error: HoppingWindow can only be used as a top-level GROUP BY expression\n" + ); + } + + Y_UNIT_TEST(HoppingWindowWithNonConstIntervals) { + ExpectFailWithError( + R"sql( + SELECT + key, + hopping_start + FROM plato.Input + GROUP BY + HoppingWindow(key, 39 + subkey, 42) AS hopping_start, + key; + )sql", + + "<main>:7:21: Error: Source does not allow column references\n" + "<main>:7:45: Error: Column reference 'subkey'\n" + ); + + ExpectFailWithError( + R"sql( + SELECT + key, + hopping_start + FROM plato.Input + GROUP BY + HoppingWindow(key, 39 + subkey, 42) AS hopping_start, + key; + )sql", + + "<main>:7:21: Error: Source does not allow column references\n" + "<main>:7:45: Error: Column reference 'subkey'\n" + ); + } + + Y_UNIT_TEST(HoppingWindowWithWrongNumberOfArgs) { + ExpectFailWithError( + R"sql( + SELECT + * + FROM plato.Input + GROUP BY HoppingWindow(key, 39); + )sql", + + "<main>:5:26: Error: HoppingWindow requires three arguments\n" + ); + + ExpectFailWithError( + R"sql( + SELECT + * + FROM plato.Input + GROUP BY HoppingWindow(key, 39, 42, 63); + )sql", + + "<main>:5:26: Error: HoppingWindow requires three arguments\n" + ); + } + + Y_UNIT_TEST(DuplicateHoppingWindow) { + ExpectFailWithError( + R"sql( + SELECT + * + FROM plato.Input + GROUP BY + HoppingWindow(key, 39, 42), + subkey, + HoppingWindow(ts, 42, 39); + )sql", + + "<main>:8:21: Error: Duplicate hopping window specification:\n" + "<main>:6:21: Error: Previous hopping window is declared here\n" + ); + } + + Y_UNIT_TEST(HopStartEndWithoutSource) { + ExpectFailWithError( + R"sql(SELECT 1 + HopStart();)sql", + "<main>:1:12: Error: HopStart requires data source\n" + ); + + ExpectFailWithError( + R"sql(SELECT 1 + HopEnd();)sql", + "<main>:1:12: Error: HopEnd requires data source\n" + ); + } + + Y_UNIT_TEST(HopStartEndWithoutGroupByOrWindow) { + ExpectFailWithError( + R"sql(SELECT 1 + HopStart() FROM plato.Input;)sql", + "<main>:1:12: Error: HopStart can not be used without aggregation by HoppingWindow\n" + ); + + ExpectFailWithError( + R"sql(SELECT 1 + HopEnd() FROM plato.Input;)sql", + "<main>:1:12: Error: HopEnd can not be used without aggregation by HoppingWindow\n" + ); + } + + Y_UNIT_TEST(HopStartEndWithGroupByWithoutHopping) { + ExpectFailWithError( + R"sql( + SELECT + 1 + HopStart() + FROM plato.Input + GROUP BY user; + )sql", + + "<main>:3:25: Error: HopStart can not be used here: HoppingWindow specification is missing in GROUP BY\n" + ); + + ExpectFailWithError( + R"sql( + SELECT + 1 + HopEnd() + FROM plato.Input + GROUP BY user; + )sql", + + "<main>:3:25: Error: HopEnd can not be used here: HoppingWindow specification is missing in GROUP BY\n" + ); + } +} |