diff options
author | vokayndzop <[email protected]> | 2025-08-08 16:11:35 +0300 |
---|---|---|
committer | vokayndzop <[email protected]> | 2025-08-08 16:39:46 +0300 |
commit | 2c274bc53d8b06b40ef04a656f5665e0ba94a63d (patch) | |
tree | 3a1f44a67a369d3860ddbe6d6dadf50222f3af0d | |
parent | 950c5567a0edd38282fce2860727a9cfa9e3a9ac (diff) |
Watermarks: DDL
commit_hash:64ad6b4138ee27f474b440e8ef3e07fac1935346
-rw-r--r-- | yql/essentials/core/type_ann/type_ann_list.cpp | 15 | ||||
-rw-r--r-- | yql/essentials/sql/v1/SQLv1.g.in | 6 | ||||
-rw-r--r-- | yql/essentials/sql/v1/SQLv1Antlr4.g.in | 6 | ||||
-rw-r--r-- | yql/essentials/sql/v1/complete/sql_complete_ut.cpp | 3 | ||||
-rw-r--r-- | yql/essentials/sql/v1/query.cpp | 14 | ||||
-rw-r--r-- | yql/essentials/sql/v1/select.cpp | 25 | ||||
-rw-r--r-- | yql/essentials/sql/v1/source.h | 1 | ||||
-rw-r--r-- | yql/essentials/sql/v1/sql_translation.cpp | 15 | ||||
-rw-r--r-- | yql/essentials/sql/v1/sql_ut.h | 2 | ||||
-rw-r--r-- | yql/essentials/sql/v1/sql_ut_antlr4.h | 2 | ||||
-rw-r--r-- | yql/essentials/sql/v1/sql_ut_common.h | 41 | ||||
-rw-r--r-- | yql/essentials/udfs/language/yql/yql_language_udf.cpp | 5 |
12 files changed, 124 insertions, 11 deletions
diff --git a/yql/essentials/core/type_ann/type_ann_list.cpp b/yql/essentials/core/type_ann/type_ann_list.cpp index 678a5c76a3a..61706350662 100644 --- a/yql/essentials/core/type_ann/type_ann_list.cpp +++ b/yql/essentials/core/type_ann/type_ann_list.cpp @@ -6925,13 +6925,15 @@ namespace { return IGraphTransformer::TStatus::Error; } - if (!UpdateLambdaAllArgumentsTypes(lambdaTimeExtractor, {itemType}, ctx.Expr)) - { + if (!UpdateLambdaAllArgumentsTypes(lambdaTimeExtractor, {itemType}, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } if (!lambdaTimeExtractor->GetTypeAnn()) { return IGraphTransformer::TStatus::Repeat; } + if (!EnsureSpecificDataType(*lambdaTimeExtractor, EDataSlot::Timestamp, ctx.Expr, true)) { + return IGraphTransformer::TStatus::Error; + } if (!EnsureSpecificDataType(*hop, EDataSlot::Interval, ctx.Expr, true)) { return IGraphTransformer::TStatus::Error; @@ -7077,8 +7079,7 @@ namespace { return IGraphTransformer::TStatus::Error; } - if (!UpdateLambdaAllArgumentsTypes(lambdaKeyExtractor, {itemType}, ctx.Expr)) - { + if (!UpdateLambdaAllArgumentsTypes(lambdaKeyExtractor, {itemType}, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } auto keyType = lambdaKeyExtractor->GetTypeAnn(); @@ -7086,13 +7087,15 @@ namespace { return IGraphTransformer::TStatus::Repeat; } - if (!UpdateLambdaAllArgumentsTypes(lambdaTimeExtractor, {itemType}, ctx.Expr)) - { + if (!UpdateLambdaAllArgumentsTypes(lambdaTimeExtractor, {itemType}, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } if (!lambdaTimeExtractor->GetTypeAnn()) { return IGraphTransformer::TStatus::Repeat; } + if (!EnsureSpecificDataType(*lambdaTimeExtractor, EDataSlot::Timestamp, ctx.Expr, true)) { + return IGraphTransformer::TStatus::Error; + } if (!EnsureSpecificDataType(*hop, EDataSlot::Interval, ctx.Expr, true)) { return IGraphTransformer::TStatus::Error; diff --git a/yql/essentials/sql/v1/SQLv1.g.in b/yql/essentials/sql/v1/SQLv1.g.in index 4c3b74b7e11..b8892cc449d 100644 --- a/yql/essentials/sql/v1/SQLv1.g.in +++ b/yql/essentials/sql/v1/SQLv1.g.in @@ -978,6 +978,7 @@ table_hint: an_id_hint (EQUALS (type_name_tag | LPAREN type_name_tag (COMMA type_name_tag)* COMMA? RPAREN))? | (SCHEMA | COLUMNS) EQUALS? type_name_or_bind | SCHEMA EQUALS? LPAREN (struct_arg_positional (COMMA struct_arg_positional)*)? COMMA? RPAREN + | WATERMARK AS LPAREN expr RPAREN ; object_ref: (cluster_expr DOT)? id_or_at; @@ -988,7 +989,7 @@ into_simple_table_ref: simple_table_ref (ERASE BY pure_column_list)?; delete_stmt: BATCH? DELETE FROM simple_table_ref (WHERE expr | ON into_values_source)? returning_columns_list?; update_stmt: BATCH? UPDATE simple_table_ref (SET set_clause_choice (WHERE expr)? | ON into_values_source) returning_columns_list?; -/// out of 2003 standart +/// out of 2003 standard set_clause_choice: set_clause_list | multiple_column_assignment; set_clause_list: set_clause (COMMA set_clause)*; @@ -1540,6 +1541,7 @@ keyword_as_compat: | VALUES // | VIEW | VIRTUAL + | WATERMARK // | WITH | WRAPPER // | WRITE @@ -1769,6 +1771,7 @@ keyword_compat: ( | VALUES | VIEW | VIRTUAL + | WATERMARK | WITH | WRAPPER // | WRITE @@ -2155,6 +2158,7 @@ VALUES: V A L U E S; VARIANT: V A R I A N T; VIEW: V I E W; VIRTUAL: V I R T U A L; +WATERMARK: W A T E R M A R K; WHEN: W H E N; WHERE: W H E R E; WINDOW: W I N D O W; diff --git a/yql/essentials/sql/v1/SQLv1Antlr4.g.in b/yql/essentials/sql/v1/SQLv1Antlr4.g.in index 120caa7751b..421efdd3ee4 100644 --- a/yql/essentials/sql/v1/SQLv1Antlr4.g.in +++ b/yql/essentials/sql/v1/SQLv1Antlr4.g.in @@ -978,6 +978,7 @@ table_hint: an_id_hint (EQUALS (type_name_tag | LPAREN type_name_tag (COMMA type_name_tag)* COMMA? RPAREN))? | (SCHEMA | COLUMNS) EQUALS? type_name_or_bind | SCHEMA EQUALS? LPAREN (struct_arg_positional (COMMA struct_arg_positional)*)? COMMA? RPAREN + | WATERMARK AS LPAREN expr RPAREN ; object_ref: (cluster_expr DOT)? id_or_at; @@ -988,7 +989,7 @@ into_simple_table_ref: simple_table_ref (ERASE BY pure_column_list)?; delete_stmt: BATCH? DELETE FROM simple_table_ref (WHERE expr | ON into_values_source)? returning_columns_list?; update_stmt: BATCH? UPDATE simple_table_ref (SET set_clause_choice (WHERE expr)? | ON into_values_source) returning_columns_list?; -/// out of 2003 standart +/// out of 2003 standard set_clause_choice: set_clause_list | multiple_column_assignment; set_clause_list: set_clause (COMMA set_clause)*; @@ -1540,6 +1541,7 @@ keyword_as_compat: | VALUES // | VIEW | VIRTUAL + | WATERMARK // | WITH | WRAPPER // | WRITE @@ -1770,6 +1772,7 @@ keyword_compat: ( | VIEW | VIRTUAL | WITH + | WATERMARK | WRAPPER // | WRITE | XOR @@ -2157,6 +2160,7 @@ VALUES: V A L U E S; VARIANT: V A R I A N T; VIEW: V I E W; VIRTUAL: V I R T U A L; +WATERMARK: W A T E R M A R K; WHEN: W H E N; WHERE: W H E R E; WINDOW: W I N D O W; diff --git a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp index 604af6008d9..bffe15a869b 100644 --- a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp +++ b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp @@ -932,6 +932,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { TVector<TCandidate> expected = { {Keyword, "COLUMNS"}, {Keyword, "SCHEMA"}, + {Keyword, "WATERMARK AS()", 1}, {HintName, "XLOCK"}, }; UNIT_ASSERT_VALUES_EQUAL(Complete(engine, "REDUCE my_table WITH "), expected); @@ -940,6 +941,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { TVector<TCandidate> expected = { {Keyword, "COLUMNS"}, {Keyword, "SCHEMA"}, + {Keyword, "WATERMARK AS()", 1}, {HintName, "XLOCK"}, }; UNIT_ASSERT_VALUES_EQUAL(Complete(engine, "SELECT key FROM my_table WITH "), expected); @@ -951,6 +953,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { {Keyword, "COLUMNS"}, {HintName, "EXPIRATION"}, {Keyword, "SCHEMA"}, + {Keyword, "WATERMARK AS()", 1}, }; auto engine = MakeSqlCompletionEngineUT(); diff --git a/yql/essentials/sql/v1/query.cpp b/yql/essentials/sql/v1/query.cpp index ffbfd669ac9..e0da16ff05d 100644 --- a/yql/essentials/sql/v1/query.cpp +++ b/yql/essentials/sql/v1/query.cpp @@ -862,6 +862,20 @@ public: ctx.IncrementMonCounter("sql_errors", "NormalizeHintError"); return false; } + + if ("watermark" == hintName) { + TNodePtr option = Y(BuildQuotedAtom(Pos_, hintName)); + auto anyColumnSrc = BuildAnyColumnSource(Pos_); + for (auto& x : hint.second) { + if (!x->Init(ctx, anyColumnSrc.Get())) { + return false; + } + option = L(option, x); + } + Nodes_.push_back(Q(option)); + continue; + } + TNodePtr option = Y(BuildQuotedAtom(Pos_, hintName)); for (auto& x : hint.second) { if (!x->Init(ctx, src)) { diff --git a/yql/essentials/sql/v1/select.cpp b/yql/essentials/sql/v1/select.cpp index 03a8483ce0c..c75f18df30a 100644 --- a/yql/essentials/sql/v1/select.cpp +++ b/yql/essentials/sql/v1/select.cpp @@ -3134,6 +3134,31 @@ TSourcePtr BuildSelect(TPosition pos, TSourcePtr source, TNodePtr skipTake) { return new TSelect(pos, std::move(source), skipTake); } +class TAnyColumnSource final : public ISource { +public: + TAnyColumnSource(TPosition pos) : ISource(pos) {} + + bool DoInit(TContext&, ISource*) final { + return true; + } + + TNodePtr Build(TContext&) final { + return nullptr; + } + + TNodePtr DoClone() const final { + return MakeIntrusive<TAnyColumnSource>(Pos_); + } + + TMaybe<bool> AddColumn(TContext&, TColumnNode&) final { + return {true}; + } +}; + +TSourcePtr BuildAnyColumnSource(TPosition pos) { + return new TAnyColumnSource(pos); +} + class TSelectResultNode final: public TAstListNode { public: TSelectResultNode(TPosition pos, TSourcePtr source, bool writeResult, bool inSubquery, diff --git a/yql/essentials/sql/v1/source.h b/yql/essentials/sql/v1/source.h index 2c42dda0877..9a7a1fa8540 100644 --- a/yql/essentials/sql/v1/source.h +++ b/yql/essentials/sql/v1/source.h @@ -276,6 +276,7 @@ namespace NSQLTranslationV1 { TColumnsSets&& distinctSets ); TSourcePtr BuildSelect(TPosition pos, TSourcePtr source, TNodePtr skipTake); + TSourcePtr BuildAnyColumnSource(TPosition pos); enum class ReduceMode { diff --git a/yql/essentials/sql/v1/sql_translation.cpp b/yql/essentials/sql/v1/sql_translation.cpp index 42e03775303..f4adf24ce62 100644 --- a/yql/essentials/sql/v1/sql_translation.cpp +++ b/yql/essentials/sql/v1/sql_translation.cpp @@ -3457,6 +3457,7 @@ bool TSqlTranslation::TableHintImpl(const TRule_table_hint& rule, TTableHints& h // an_id_hint (EQUALS (type_name_tag | LPAREN type_name_tag (COMMA type_name_tag)* COMMA? RPAREN))? // | (SCHEMA | COLUMNS) EQUALS? type_name_or_bind // | SCHEMA EQUALS? LPAREN (struct_arg_positional (COMMA struct_arg_positional)*)? COMMA? RPAREN + // | WATERMARK AS LPAREN expr RPAREN switch (rule.Alt_case()) { case TRule_table_hint::kAltTableHint1: { const auto& alt = rule.GetAlt_table_hint1(); @@ -3569,6 +3570,18 @@ bool TSqlTranslation::TableHintImpl(const TRule_table_hint& rule, TTableHints& h } } + case TRule_table_hint::kAltTableHint4: { + const auto& alt = rule.GetAlt_table_hint4(); + const auto pos = Ctx_.TokenPosition(alt.GetToken1()); + TColumnRefScope scope(Ctx_, EColumnRefState::Allow); + auto expr = TSqlExpression(Ctx_, Mode_).Build(alt.GetRule_expr4()); + if (!expr) { + return false; + } + hints["watermark"] = { BuildLambda(pos, BuildList(pos, {BuildAtom(pos, "row")}), std::move(expr)) }; + break; + } + case TRule_table_hint::ALT_NOT_SET: Y_ABORT("You should change implementation according to grammar changes"); } @@ -4490,7 +4503,7 @@ bool TSqlTranslation::FrameBound(const TRule_window_frame_bound& rule, TFrameBou break; } case TRule_window_frame_bound::ALT_NOT_SET: - Y_ABORT("FrameClause: frame bound not corresond to grammar changes"); + Y_ABORT("FrameClause: frame bound not correspond to grammar changes"); } return true; } diff --git a/yql/essentials/sql/v1/sql_ut.h b/yql/essentials/sql/v1/sql_ut.h index 37e06e8708b..fb18b1658f3 100644 --- a/yql/essentials/sql/v1/sql_ut.h +++ b/yql/essentials/sql/v1/sql_ut.h @@ -24,7 +24,7 @@ enum class EDebugOutput { const ui32 PRETTY_FLAGS = NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote | NYql::TAstPrintFlags::AdaptArbitraryContent; -inline TString Err2Str(NYql::TAstParseResult& res, EDebugOutput debug = EDebugOutput::None) { +inline TString Err2Str(const NYql::TAstParseResult& res, EDebugOutput debug = EDebugOutput::None) { TStringStream s; res.Issues.PrintTo(s); diff --git a/yql/essentials/sql/v1/sql_ut_antlr4.h b/yql/essentials/sql/v1/sql_ut_antlr4.h index 03d804707f0..a8fdd11e257 100644 --- a/yql/essentials/sql/v1/sql_ut_antlr4.h +++ b/yql/essentials/sql/v1/sql_ut_antlr4.h @@ -24,7 +24,7 @@ enum class EDebugOutput { const ui32 PRETTY_FLAGS = NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote | NYql::TAstPrintFlags::AdaptArbitraryContent; -inline TString Err2Str(NYql::TAstParseResult& res, EDebugOutput debug = EDebugOutput::None) { +inline TString Err2Str(const NYql::TAstParseResult& res, EDebugOutput debug = EDebugOutput::None) { TStringStream s; res.Issues.PrintTo(s); diff --git a/yql/essentials/sql/v1/sql_ut_common.h b/yql/essentials/sql/v1/sql_ut_common.h index c43ef974b97..ac9ebdbc5c6 100644 --- a/yql/essentials/sql/v1/sql_ut_common.h +++ b/yql/essentials/sql/v1/sql_ut_common.h @@ -8949,3 +8949,44 @@ Y_UNIT_TEST_SUITE(Aggregation) { } } + +Y_UNIT_TEST_SUITE(Watermarks) { + Y_UNIT_TEST(Insert) { + const auto stmt = R"sql( +USE plato; + +INSERT INTO Output +SELECT + * +FROM Input +WITH( + SCHEMA( + ts Timestamp, + ), + WATERMARK AS (ts) +); +)sql"; + const auto& res = SqlToYql(stmt); + Err2Str(res, EDebugOutput::ToCerr); + UNIT_ASSERT(res.IsOk()); + } + + Y_UNIT_TEST(Select) { + const auto stmt = R"sql( +USE plato; + +SELECT + * +FROM Input +WITH( + SCHEMA( + ts Timestamp, + ), + WATERMARK AS (ts) +); +)sql"; + const auto& res = SqlToYql(stmt); + Err2Str(res, EDebugOutput::ToCerr); + UNIT_ASSERT(res.IsOk()); + } +} diff --git a/yql/essentials/udfs/language/yql/yql_language_udf.cpp b/yql/essentials/udfs/language/yql/yql_language_udf.cpp index 3a83dfd3391..d72d74b3a90 100644 --- a/yql/essentials/udfs/language/yql/yql_language_udf.cpp +++ b/yql/essentials/udfs/language/yql/yql_language_udf.cpp @@ -122,6 +122,11 @@ private: Freqs_[std::make_pair(parent, alt.GetToken1().GetValue())] += 1; break; } + case TRule_table_hint::kAltTableHint4: { + const auto& alt = msg.GetAlt_table_hint4(); + Freqs_[std::make_pair(parent, alt.GetToken1().GetValue())] += 1; + break; + } case TRule_table_hint::ALT_NOT_SET: return; } |