summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <[email protected]>2023-06-01 11:19:59 +0300
committerilnaz <[email protected]>2023-06-01 11:19:59 +0300
commit3e093117a5edca4b9a37eb5d8a8bad8b4565f437 (patch)
tree5a57e0b81809aa0a02343616283ec0ace5ab2145
parentad69a1b560522eea94ab9632a6b7176f8e8731df (diff)
Changefeed's RESOLVED_TIMESTAMPS option
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp14
-rw-r--r--ydb/library/yql/sql/v1/format/sql_format_ut.cpp2
-rw-r--r--ydb/library/yql/sql/v1/node.h1
-rw-r--r--ydb/library/yql/sql/v1/query.cpp3
-rw-r--r--ydb/library/yql/sql/v1/sql.cpp6
-rw-r--r--ydb/library/yql/sql/v1/sql_ut.cpp15
6 files changed, 41 insertions, 0 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index aa9913c12ab..6dbb9a615c0 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -1348,6 +1348,20 @@ public:
);
add_changefeed->set_virtual_timestamps(FromString<bool>(to_lower(value)));
+ } else if (name == "resolved_timestamps") {
+ YQL_ENSURE(setting.Value().Maybe<TCoInterval>());
+ const auto value = FromString<i64>(
+ setting.Value().Cast<TCoInterval>().Literal().Value()
+ );
+
+ if (value <= 0) {
+ ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
+ TStringBuilder() << name << " must be positive"));
+ return SyncError();
+ }
+
+ const auto interval = TDuration::FromValue(value);
+ Y_UNUSED(interval); // TODO(ilnaz): implement
} else if (name == "retention_period") {
YQL_ENSURE(setting.Value().Maybe<TCoInterval>());
const auto value = FromString<i64>(
diff --git a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
index 0d8bb5d30c3..3f5e8fd70f9 100644
--- a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
+++ b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
@@ -413,6 +413,8 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
"ALTER TABLE user\n\tADD CHANGEFEED user WITH (virtual_timestamps = TRUE);\n\n"},
{"alter table user add changefeed user with (virtual_timestamps = fAlSe)",
"ALTER TABLE user\n\tADD CHANGEFEED user WITH (virtual_timestamps = FALSE);\n\n"},
+ {"alter table user add changefeed user with (resolved_timestamps = Interval(\"PT1S\"))",
+ "ALTER TABLE user\n\tADD CHANGEFEED user WITH (resolved_timestamps = Interval(\"PT1S\"));\n\n"},
};
TSetup setup;
diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h
index bf63dc24926..ecd4e908633 100644
--- a/ydb/library/yql/sql/v1/node.h
+++ b/ydb/library/yql/sql/v1/node.h
@@ -1195,6 +1195,7 @@ namespace NSQLTranslationV1 {
TNodePtr Format;
TNodePtr InitialScan;
TNodePtr VirtualTimestamps;
+ TNodePtr ResolvedTimestamps;
TNodePtr RetentionPeriod;
TNodePtr AwsRegion;
std::optional<std::variant<TLocalSinkSettings>> SinkSettings;
diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp
index a4d5193335a..274839f9a32 100644
--- a/ydb/library/yql/sql/v1/query.cpp
+++ b/ydb/library/yql/sql/v1/query.cpp
@@ -165,6 +165,9 @@ static INode::TPtr CreateChangefeedDesc(const TChangefeedDescription& desc, cons
if (desc.Settings.VirtualTimestamps) {
settings = node.L(settings, node.Q(node.Y(node.Q("virtual_timestamps"), desc.Settings.VirtualTimestamps)));
}
+ if (desc.Settings.ResolvedTimestamps) {
+ settings = node.L(settings, node.Q(node.Y(node.Q("resolved_timestamps"), desc.Settings.ResolvedTimestamps)));
+ }
if (desc.Settings.RetentionPeriod) {
settings = node.L(settings, node.Q(node.Y(node.Q("retention_period"), desc.Settings.RetentionPeriod)));
}
diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp
index f0d82989038..8ccaa593876 100644
--- a/ydb/library/yql/sql/v1/sql.cpp
+++ b/ydb/library/yql/sql/v1/sql.cpp
@@ -1883,6 +1883,12 @@ static bool ChangefeedSettingsEntry(const TRule_changefeed_settings_entry& node,
return false;
}
settings.VirtualTimestamps = exprNode;
+ } else if (to_lower(id.Name) == "resolved_timestamps") {
+ if (exprNode->GetOpName() != "Interval") {
+ ctx.Context().Error() << "Literal of Interval type is expected for " << id.Name;
+ return false;
+ }
+ settings.ResolvedTimestamps = exprNode;
} else if (to_lower(id.Name) == "retention_period") {
if (exprNode->GetOpName() != "Interval") {
ctx.Context().Error() << "Literal of Interval type is expected for " << id.Name;
diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp
index b45dc788347..dde1881eb3f 100644
--- a/ydb/library/yql/sql/v1/sql_ut.cpp
+++ b/ydb/library/yql/sql/v1/sql_ut.cpp
@@ -1823,6 +1823,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
FORMAT = 'json',
INITIAL_SCAN = TRUE,
VIRTUAL_TIMESTAMPS = FALSE,
+ RESOLVED_TIMESTAMPS = Interval("PT1S"),
RETENTION_PERIOD = Interval("P1D"),
AWS_REGION = 'aws:region'
)
@@ -1841,6 +1842,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("true"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("virtual_timestamps"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("false"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("resolved_timestamps"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("retention_period"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("aws_region"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("aws:region"));
@@ -3528,6 +3530,19 @@ select FormatType($f());
UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:101: Error: Literal of Bool type is expected for VIRTUAL_TIMESTAMPS\n");
}
+ Y_UNIT_TEST(InvalidChangefeedResolvedTimestamps) {
+ auto req = R"(
+ USE plato;
+ CREATE TABLE tableName (
+ Key Uint32, PRIMARY KEY (Key),
+ CHANGEFEED feedName WITH (MODE = "KEYS_ONLY", FORMAT = "json", RESOLVED_TIMESTAMPS = "foo")
+ );
+ )";
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:102: Error: Literal of Interval type is expected for RESOLVED_TIMESTAMPS\n");
+ }
+
Y_UNIT_TEST(InvalidChangefeedRetentionPeriod) {
auto req = R"(
USE plato;