summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvokayndzop <[email protected]>2025-08-08 16:11:35 +0300
committervokayndzop <[email protected]>2025-08-08 16:39:46 +0300
commit2c274bc53d8b06b40ef04a656f5665e0ba94a63d (patch)
tree3a1f44a67a369d3860ddbe6d6dadf50222f3af0d
parent950c5567a0edd38282fce2860727a9cfa9e3a9ac (diff)
Watermarks: DDL
commit_hash:64ad6b4138ee27f474b440e8ef3e07fac1935346
-rw-r--r--yql/essentials/core/type_ann/type_ann_list.cpp15
-rw-r--r--yql/essentials/sql/v1/SQLv1.g.in6
-rw-r--r--yql/essentials/sql/v1/SQLv1Antlr4.g.in6
-rw-r--r--yql/essentials/sql/v1/complete/sql_complete_ut.cpp3
-rw-r--r--yql/essentials/sql/v1/query.cpp14
-rw-r--r--yql/essentials/sql/v1/select.cpp25
-rw-r--r--yql/essentials/sql/v1/source.h1
-rw-r--r--yql/essentials/sql/v1/sql_translation.cpp15
-rw-r--r--yql/essentials/sql/v1/sql_ut.h2
-rw-r--r--yql/essentials/sql/v1/sql_ut_antlr4.h2
-rw-r--r--yql/essentials/sql/v1/sql_ut_common.h41
-rw-r--r--yql/essentials/udfs/language/yql/yql_language_udf.cpp5
12 files changed, 124 insertions, 11 deletions
diff --git a/yql/essentials/core/type_ann/type_ann_list.cpp b/yql/essentials/core/type_ann/type_ann_list.cpp
index 678a5c76a3a..61706350662 100644
--- a/yql/essentials/core/type_ann/type_ann_list.cpp
+++ b/yql/essentials/core/type_ann/type_ann_list.cpp
@@ -6925,13 +6925,15 @@ namespace {
return IGraphTransformer::TStatus::Error;
}
- if (!UpdateLambdaAllArgumentsTypes(lambdaTimeExtractor, {itemType}, ctx.Expr))
- {
+ if (!UpdateLambdaAllArgumentsTypes(lambdaTimeExtractor, {itemType}, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
if (!lambdaTimeExtractor->GetTypeAnn()) {
return IGraphTransformer::TStatus::Repeat;
}
+ if (!EnsureSpecificDataType(*lambdaTimeExtractor, EDataSlot::Timestamp, ctx.Expr, true)) {
+ return IGraphTransformer::TStatus::Error;
+ }
if (!EnsureSpecificDataType(*hop, EDataSlot::Interval, ctx.Expr, true)) {
return IGraphTransformer::TStatus::Error;
@@ -7077,8 +7079,7 @@ namespace {
return IGraphTransformer::TStatus::Error;
}
- if (!UpdateLambdaAllArgumentsTypes(lambdaKeyExtractor, {itemType}, ctx.Expr))
- {
+ if (!UpdateLambdaAllArgumentsTypes(lambdaKeyExtractor, {itemType}, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
auto keyType = lambdaKeyExtractor->GetTypeAnn();
@@ -7086,13 +7087,15 @@ namespace {
return IGraphTransformer::TStatus::Repeat;
}
- if (!UpdateLambdaAllArgumentsTypes(lambdaTimeExtractor, {itemType}, ctx.Expr))
- {
+ if (!UpdateLambdaAllArgumentsTypes(lambdaTimeExtractor, {itemType}, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
if (!lambdaTimeExtractor->GetTypeAnn()) {
return IGraphTransformer::TStatus::Repeat;
}
+ if (!EnsureSpecificDataType(*lambdaTimeExtractor, EDataSlot::Timestamp, ctx.Expr, true)) {
+ return IGraphTransformer::TStatus::Error;
+ }
if (!EnsureSpecificDataType(*hop, EDataSlot::Interval, ctx.Expr, true)) {
return IGraphTransformer::TStatus::Error;
diff --git a/yql/essentials/sql/v1/SQLv1.g.in b/yql/essentials/sql/v1/SQLv1.g.in
index 4c3b74b7e11..b8892cc449d 100644
--- a/yql/essentials/sql/v1/SQLv1.g.in
+++ b/yql/essentials/sql/v1/SQLv1.g.in
@@ -978,6 +978,7 @@ table_hint:
an_id_hint (EQUALS (type_name_tag | LPAREN type_name_tag (COMMA type_name_tag)* COMMA? RPAREN))?
| (SCHEMA | COLUMNS) EQUALS? type_name_or_bind
| SCHEMA EQUALS? LPAREN (struct_arg_positional (COMMA struct_arg_positional)*)? COMMA? RPAREN
+ | WATERMARK AS LPAREN expr RPAREN
;
object_ref: (cluster_expr DOT)? id_or_at;
@@ -988,7 +989,7 @@ into_simple_table_ref: simple_table_ref (ERASE BY pure_column_list)?;
delete_stmt: BATCH? DELETE FROM simple_table_ref (WHERE expr | ON into_values_source)? returning_columns_list?;
update_stmt: BATCH? UPDATE simple_table_ref (SET set_clause_choice (WHERE expr)? | ON into_values_source) returning_columns_list?;
-/// out of 2003 standart
+/// out of 2003 standard
set_clause_choice: set_clause_list | multiple_column_assignment;
set_clause_list: set_clause (COMMA set_clause)*;
@@ -1540,6 +1541,7 @@ keyword_as_compat:
| VALUES
// | VIEW
| VIRTUAL
+ | WATERMARK
// | WITH
| WRAPPER
// | WRITE
@@ -1769,6 +1771,7 @@ keyword_compat: (
| VALUES
| VIEW
| VIRTUAL
+ | WATERMARK
| WITH
| WRAPPER
// | WRITE
@@ -2155,6 +2158,7 @@ VALUES: V A L U E S;
VARIANT: V A R I A N T;
VIEW: V I E W;
VIRTUAL: V I R T U A L;
+WATERMARK: W A T E R M A R K;
WHEN: W H E N;
WHERE: W H E R E;
WINDOW: W I N D O W;
diff --git a/yql/essentials/sql/v1/SQLv1Antlr4.g.in b/yql/essentials/sql/v1/SQLv1Antlr4.g.in
index 120caa7751b..421efdd3ee4 100644
--- a/yql/essentials/sql/v1/SQLv1Antlr4.g.in
+++ b/yql/essentials/sql/v1/SQLv1Antlr4.g.in
@@ -978,6 +978,7 @@ table_hint:
an_id_hint (EQUALS (type_name_tag | LPAREN type_name_tag (COMMA type_name_tag)* COMMA? RPAREN))?
| (SCHEMA | COLUMNS) EQUALS? type_name_or_bind
| SCHEMA EQUALS? LPAREN (struct_arg_positional (COMMA struct_arg_positional)*)? COMMA? RPAREN
+ | WATERMARK AS LPAREN expr RPAREN
;
object_ref: (cluster_expr DOT)? id_or_at;
@@ -988,7 +989,7 @@ into_simple_table_ref: simple_table_ref (ERASE BY pure_column_list)?;
delete_stmt: BATCH? DELETE FROM simple_table_ref (WHERE expr | ON into_values_source)? returning_columns_list?;
update_stmt: BATCH? UPDATE simple_table_ref (SET set_clause_choice (WHERE expr)? | ON into_values_source) returning_columns_list?;
-/// out of 2003 standart
+/// out of 2003 standard
set_clause_choice: set_clause_list | multiple_column_assignment;
set_clause_list: set_clause (COMMA set_clause)*;
@@ -1540,6 +1541,7 @@ keyword_as_compat:
| VALUES
// | VIEW
| VIRTUAL
+ | WATERMARK
// | WITH
| WRAPPER
// | WRITE
@@ -1770,6 +1772,7 @@ keyword_compat: (
| VIEW
| VIRTUAL
| WITH
+ | WATERMARK
| WRAPPER
// | WRITE
| XOR
@@ -2157,6 +2160,7 @@ VALUES: V A L U E S;
VARIANT: V A R I A N T;
VIEW: V I E W;
VIRTUAL: V I R T U A L;
+WATERMARK: W A T E R M A R K;
WHEN: W H E N;
WHERE: W H E R E;
WINDOW: W I N D O W;
diff --git a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp
index 604af6008d9..bffe15a869b 100644
--- a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp
+++ b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp
@@ -932,6 +932,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) {
TVector<TCandidate> expected = {
{Keyword, "COLUMNS"},
{Keyword, "SCHEMA"},
+ {Keyword, "WATERMARK AS()", 1},
{HintName, "XLOCK"},
};
UNIT_ASSERT_VALUES_EQUAL(Complete(engine, "REDUCE my_table WITH "), expected);
@@ -940,6 +941,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) {
TVector<TCandidate> expected = {
{Keyword, "COLUMNS"},
{Keyword, "SCHEMA"},
+ {Keyword, "WATERMARK AS()", 1},
{HintName, "XLOCK"},
};
UNIT_ASSERT_VALUES_EQUAL(Complete(engine, "SELECT key FROM my_table WITH "), expected);
@@ -951,6 +953,7 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) {
{Keyword, "COLUMNS"},
{HintName, "EXPIRATION"},
{Keyword, "SCHEMA"},
+ {Keyword, "WATERMARK AS()", 1},
};
auto engine = MakeSqlCompletionEngineUT();
diff --git a/yql/essentials/sql/v1/query.cpp b/yql/essentials/sql/v1/query.cpp
index ffbfd669ac9..e0da16ff05d 100644
--- a/yql/essentials/sql/v1/query.cpp
+++ b/yql/essentials/sql/v1/query.cpp
@@ -862,6 +862,20 @@ public:
ctx.IncrementMonCounter("sql_errors", "NormalizeHintError");
return false;
}
+
+ if ("watermark" == hintName) {
+ TNodePtr option = Y(BuildQuotedAtom(Pos_, hintName));
+ auto anyColumnSrc = BuildAnyColumnSource(Pos_);
+ for (auto& x : hint.second) {
+ if (!x->Init(ctx, anyColumnSrc.Get())) {
+ return false;
+ }
+ option = L(option, x);
+ }
+ Nodes_.push_back(Q(option));
+ continue;
+ }
+
TNodePtr option = Y(BuildQuotedAtom(Pos_, hintName));
for (auto& x : hint.second) {
if (!x->Init(ctx, src)) {
diff --git a/yql/essentials/sql/v1/select.cpp b/yql/essentials/sql/v1/select.cpp
index 03a8483ce0c..c75f18df30a 100644
--- a/yql/essentials/sql/v1/select.cpp
+++ b/yql/essentials/sql/v1/select.cpp
@@ -3134,6 +3134,31 @@ TSourcePtr BuildSelect(TPosition pos, TSourcePtr source, TNodePtr skipTake) {
return new TSelect(pos, std::move(source), skipTake);
}
+class TAnyColumnSource final : public ISource {
+public:
+ TAnyColumnSource(TPosition pos) : ISource(pos) {}
+
+ bool DoInit(TContext&, ISource*) final {
+ return true;
+ }
+
+ TNodePtr Build(TContext&) final {
+ return nullptr;
+ }
+
+ TNodePtr DoClone() const final {
+ return MakeIntrusive<TAnyColumnSource>(Pos_);
+ }
+
+ TMaybe<bool> AddColumn(TContext&, TColumnNode&) final {
+ return {true};
+ }
+};
+
+TSourcePtr BuildAnyColumnSource(TPosition pos) {
+ return new TAnyColumnSource(pos);
+}
+
class TSelectResultNode final: public TAstListNode {
public:
TSelectResultNode(TPosition pos, TSourcePtr source, bool writeResult, bool inSubquery,
diff --git a/yql/essentials/sql/v1/source.h b/yql/essentials/sql/v1/source.h
index 2c42dda0877..9a7a1fa8540 100644
--- a/yql/essentials/sql/v1/source.h
+++ b/yql/essentials/sql/v1/source.h
@@ -276,6 +276,7 @@ namespace NSQLTranslationV1 {
TColumnsSets&& distinctSets
);
TSourcePtr BuildSelect(TPosition pos, TSourcePtr source, TNodePtr skipTake);
+ TSourcePtr BuildAnyColumnSource(TPosition pos);
enum class ReduceMode {
diff --git a/yql/essentials/sql/v1/sql_translation.cpp b/yql/essentials/sql/v1/sql_translation.cpp
index 42e03775303..f4adf24ce62 100644
--- a/yql/essentials/sql/v1/sql_translation.cpp
+++ b/yql/essentials/sql/v1/sql_translation.cpp
@@ -3457,6 +3457,7 @@ bool TSqlTranslation::TableHintImpl(const TRule_table_hint& rule, TTableHints& h
// an_id_hint (EQUALS (type_name_tag | LPAREN type_name_tag (COMMA type_name_tag)* COMMA? RPAREN))?
// | (SCHEMA | COLUMNS) EQUALS? type_name_or_bind
// | SCHEMA EQUALS? LPAREN (struct_arg_positional (COMMA struct_arg_positional)*)? COMMA? RPAREN
+ // | WATERMARK AS LPAREN expr RPAREN
switch (rule.Alt_case()) {
case TRule_table_hint::kAltTableHint1: {
const auto& alt = rule.GetAlt_table_hint1();
@@ -3569,6 +3570,18 @@ bool TSqlTranslation::TableHintImpl(const TRule_table_hint& rule, TTableHints& h
}
}
+ case TRule_table_hint::kAltTableHint4: {
+ const auto& alt = rule.GetAlt_table_hint4();
+ const auto pos = Ctx_.TokenPosition(alt.GetToken1());
+ TColumnRefScope scope(Ctx_, EColumnRefState::Allow);
+ auto expr = TSqlExpression(Ctx_, Mode_).Build(alt.GetRule_expr4());
+ if (!expr) {
+ return false;
+ }
+ hints["watermark"] = { BuildLambda(pos, BuildList(pos, {BuildAtom(pos, "row")}), std::move(expr)) };
+ break;
+ }
+
case TRule_table_hint::ALT_NOT_SET:
Y_ABORT("You should change implementation according to grammar changes");
}
@@ -4490,7 +4503,7 @@ bool TSqlTranslation::FrameBound(const TRule_window_frame_bound& rule, TFrameBou
break;
}
case TRule_window_frame_bound::ALT_NOT_SET:
- Y_ABORT("FrameClause: frame bound not corresond to grammar changes");
+ Y_ABORT("FrameClause: frame bound not correspond to grammar changes");
}
return true;
}
diff --git a/yql/essentials/sql/v1/sql_ut.h b/yql/essentials/sql/v1/sql_ut.h
index 37e06e8708b..fb18b1658f3 100644
--- a/yql/essentials/sql/v1/sql_ut.h
+++ b/yql/essentials/sql/v1/sql_ut.h
@@ -24,7 +24,7 @@ enum class EDebugOutput {
const ui32 PRETTY_FLAGS = NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote |
NYql::TAstPrintFlags::AdaptArbitraryContent;
-inline TString Err2Str(NYql::TAstParseResult& res, EDebugOutput debug = EDebugOutput::None) {
+inline TString Err2Str(const NYql::TAstParseResult& res, EDebugOutput debug = EDebugOutput::None) {
TStringStream s;
res.Issues.PrintTo(s);
diff --git a/yql/essentials/sql/v1/sql_ut_antlr4.h b/yql/essentials/sql/v1/sql_ut_antlr4.h
index 03d804707f0..a8fdd11e257 100644
--- a/yql/essentials/sql/v1/sql_ut_antlr4.h
+++ b/yql/essentials/sql/v1/sql_ut_antlr4.h
@@ -24,7 +24,7 @@ enum class EDebugOutput {
const ui32 PRETTY_FLAGS = NYql::TAstPrintFlags::PerLine | NYql::TAstPrintFlags::ShortQuote |
NYql::TAstPrintFlags::AdaptArbitraryContent;
-inline TString Err2Str(NYql::TAstParseResult& res, EDebugOutput debug = EDebugOutput::None) {
+inline TString Err2Str(const NYql::TAstParseResult& res, EDebugOutput debug = EDebugOutput::None) {
TStringStream s;
res.Issues.PrintTo(s);
diff --git a/yql/essentials/sql/v1/sql_ut_common.h b/yql/essentials/sql/v1/sql_ut_common.h
index c43ef974b97..ac9ebdbc5c6 100644
--- a/yql/essentials/sql/v1/sql_ut_common.h
+++ b/yql/essentials/sql/v1/sql_ut_common.h
@@ -8949,3 +8949,44 @@ Y_UNIT_TEST_SUITE(Aggregation) {
}
}
+
+Y_UNIT_TEST_SUITE(Watermarks) {
+ Y_UNIT_TEST(Insert) {
+ const auto stmt = R"sql(
+USE plato;
+
+INSERT INTO Output
+SELECT
+ *
+FROM Input
+WITH(
+ SCHEMA(
+ ts Timestamp,
+ ),
+ WATERMARK AS (ts)
+);
+)sql";
+ const auto& res = SqlToYql(stmt);
+ Err2Str(res, EDebugOutput::ToCerr);
+ UNIT_ASSERT(res.IsOk());
+ }
+
+ Y_UNIT_TEST(Select) {
+ const auto stmt = R"sql(
+USE plato;
+
+SELECT
+ *
+FROM Input
+WITH(
+ SCHEMA(
+ ts Timestamp,
+ ),
+ WATERMARK AS (ts)
+);
+)sql";
+ const auto& res = SqlToYql(stmt);
+ Err2Str(res, EDebugOutput::ToCerr);
+ UNIT_ASSERT(res.IsOk());
+ }
+}
diff --git a/yql/essentials/udfs/language/yql/yql_language_udf.cpp b/yql/essentials/udfs/language/yql/yql_language_udf.cpp
index 3a83dfd3391..d72d74b3a90 100644
--- a/yql/essentials/udfs/language/yql/yql_language_udf.cpp
+++ b/yql/essentials/udfs/language/yql/yql_language_udf.cpp
@@ -122,6 +122,11 @@ private:
Freqs_[std::make_pair(parent, alt.GetToken1().GetValue())] += 1;
break;
}
+ case TRule_table_hint::kAltTableHint4: {
+ const auto& alt = msg.GetAlt_table_hint4();
+ Freqs_[std::make_pair(parent, alt.GetToken1().GetValue())] += 1;
+ break;
+ }
case TRule_table_hint::ALT_NOT_SET:
return;
}