diff options
author | ilnaz <[email protected]> | 2023-06-01 11:19:59 +0300 |
---|---|---|
committer | ilnaz <[email protected]> | 2023-06-01 11:19:59 +0300 |
commit | 3e093117a5edca4b9a37eb5d8a8bad8b4565f437 (patch) | |
tree | 5a57e0b81809aa0a02343616283ec0ace5ab2145 | |
parent | ad69a1b560522eea94ab9632a6b7176f8e8731df (diff) |
Changefeed's RESOLVED_TIMESTAMPS option
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 14 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/format/sql_format_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/node.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/query.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql.cpp | 6 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql_ut.cpp | 15 |
6 files changed, 41 insertions, 0 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index aa9913c12ab..6dbb9a615c0 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1348,6 +1348,20 @@ public: ); add_changefeed->set_virtual_timestamps(FromString<bool>(to_lower(value))); + } else if (name == "resolved_timestamps") { + YQL_ENSURE(setting.Value().Maybe<TCoInterval>()); + const auto value = FromString<i64>( + setting.Value().Cast<TCoInterval>().Literal().Value() + ); + + if (value <= 0) { + ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()), + TStringBuilder() << name << " must be positive")); + return SyncError(); + } + + const auto interval = TDuration::FromValue(value); + Y_UNUSED(interval); // TODO(ilnaz): implement } else if (name == "retention_period") { YQL_ENSURE(setting.Value().Maybe<TCoInterval>()); const auto value = FromString<i64>( diff --git a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp index 0d8bb5d30c3..3f5e8fd70f9 100644 --- a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp +++ b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp @@ -413,6 +413,8 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) { "ALTER TABLE user\n\tADD CHANGEFEED user WITH (virtual_timestamps = TRUE);\n\n"}, {"alter table user add changefeed user with (virtual_timestamps = fAlSe)", "ALTER TABLE user\n\tADD CHANGEFEED user WITH (virtual_timestamps = FALSE);\n\n"}, + {"alter table user add changefeed user with (resolved_timestamps = Interval(\"PT1S\"))", + "ALTER TABLE user\n\tADD CHANGEFEED user WITH (resolved_timestamps = Interval(\"PT1S\"));\n\n"}, }; TSetup setup; diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h index bf63dc24926..ecd4e908633 100644 --- a/ydb/library/yql/sql/v1/node.h +++ b/ydb/library/yql/sql/v1/node.h @@ -1195,6 +1195,7 @@ namespace NSQLTranslationV1 { TNodePtr Format; TNodePtr InitialScan; TNodePtr VirtualTimestamps; + TNodePtr ResolvedTimestamps; TNodePtr RetentionPeriod; TNodePtr AwsRegion; std::optional<std::variant<TLocalSinkSettings>> SinkSettings; diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp index a4d5193335a..274839f9a32 100644 --- a/ydb/library/yql/sql/v1/query.cpp +++ b/ydb/library/yql/sql/v1/query.cpp @@ -165,6 +165,9 @@ static INode::TPtr CreateChangefeedDesc(const TChangefeedDescription& desc, cons if (desc.Settings.VirtualTimestamps) { settings = node.L(settings, node.Q(node.Y(node.Q("virtual_timestamps"), desc.Settings.VirtualTimestamps))); } + if (desc.Settings.ResolvedTimestamps) { + settings = node.L(settings, node.Q(node.Y(node.Q("resolved_timestamps"), desc.Settings.ResolvedTimestamps))); + } if (desc.Settings.RetentionPeriod) { settings = node.L(settings, node.Q(node.Y(node.Q("retention_period"), desc.Settings.RetentionPeriod))); } diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp index f0d82989038..8ccaa593876 100644 --- a/ydb/library/yql/sql/v1/sql.cpp +++ b/ydb/library/yql/sql/v1/sql.cpp @@ -1883,6 +1883,12 @@ static bool ChangefeedSettingsEntry(const TRule_changefeed_settings_entry& node, return false; } settings.VirtualTimestamps = exprNode; + } else if (to_lower(id.Name) == "resolved_timestamps") { + if (exprNode->GetOpName() != "Interval") { + ctx.Context().Error() << "Literal of Interval type is expected for " << id.Name; + return false; + } + settings.ResolvedTimestamps = exprNode; } else if (to_lower(id.Name) == "retention_period") { if (exprNode->GetOpName() != "Interval") { ctx.Context().Error() << "Literal of Interval type is expected for " << id.Name; diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp index b45dc788347..dde1881eb3f 100644 --- a/ydb/library/yql/sql/v1/sql_ut.cpp +++ b/ydb/library/yql/sql/v1/sql_ut.cpp @@ -1823,6 +1823,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { FORMAT = 'json', INITIAL_SCAN = TRUE, VIRTUAL_TIMESTAMPS = FALSE, + RESOLVED_TIMESTAMPS = Interval("PT1S"), RETENTION_PERIOD = Interval("P1D"), AWS_REGION = 'aws:region' ) @@ -1841,6 +1842,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("true")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("virtual_timestamps")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("false")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("resolved_timestamps")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("retention_period")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("aws_region")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("aws:region")); @@ -3528,6 +3530,19 @@ select FormatType($f()); UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:101: Error: Literal of Bool type is expected for VIRTUAL_TIMESTAMPS\n"); } + Y_UNIT_TEST(InvalidChangefeedResolvedTimestamps) { + auto req = R"( + USE plato; + CREATE TABLE tableName ( + Key Uint32, PRIMARY KEY (Key), + CHANGEFEED feedName WITH (MODE = "KEYS_ONLY", FORMAT = "json", RESOLVED_TIMESTAMPS = "foo") + ); + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:102: Error: Literal of Interval type is expected for RESOLVED_TIMESTAMPS\n"); + } + Y_UNIT_TEST(InvalidChangefeedRetentionPeriod) { auto req = R"( USE plato; |