aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-11-24 16:26:32 +0300
committerilnaz <ilnaz@ydb.tech>2022-11-24 16:26:32 +0300
commit68f4c0da11f5cf3f264840d2197e36c8e2c6367c (patch)
tree9b1bc67979988626bad07c9599c1e5433cb85ade
parent5fbd67db573577fafc46098602461037b0828c71 (diff)
downloadydb-68f4c0da11f5cf3f264840d2197e36c8e2c6367c.tar.gz
RETENTION_PERIOD option for changefeeds
-rw-r--r--ydb/core/grpc_services/rpc_alter_table.cpp4
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp16
-rw-r--r--ydb/library/yql/sql/v1/SQLv1.g.in5
-rw-r--r--ydb/library/yql/sql/v1/format/sql_format_ut.cpp2
-rw-r--r--ydb/library/yql/sql/v1/node.h1
-rw-r--r--ydb/library/yql/sql/v1/query.cpp3
-rw-r--r--ydb/library/yql/sql/v1/sql.cpp171
-rw-r--r--ydb/library/yql/sql/v1/sql_ut.cpp23
8 files changed, 138 insertions, 87 deletions
diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp
index 45441a567bd..26aca5ef3b5 100644
--- a/ydb/core/grpc_services/rpc_alter_table.cpp
+++ b/ydb/core/grpc_services/rpc_alter_table.cpp
@@ -462,10 +462,12 @@ private:
for (const auto& add : req->add_changefeeds()) {
auto op = modifyScheme->MutableCreateCdcStream();
op->SetTableName(name);
+ if (add.has_retention_period()) {
+ op->SetRetentionPeriodSeconds(add.retention_period().seconds());
+ }
StatusIds::StatusCode code;
TString error;
-
if (!FillChangefeedDescription(*op->MutableStreamDescription(), add, code, error)) {
NYql::TIssues issues;
issues.AddIssue(NYql::TIssue(error));
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 05d797862bf..6360ad8063c 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -970,6 +970,22 @@ public:
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
TStringBuilder() << name << " setting is not supported yet"));
return SyncError();
+ } else if (name == "retention_period") {
+ YQL_ENSURE(setting.Value().Maybe<TCoInterval>());
+ const auto value = FromString<i64>(
+ setting.Value().Cast<TCoInterval>().Literal().Value()
+ );
+
+ if (value <= 0) {
+ ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
+ TStringBuilder() << name << " must be positive"));
+ return SyncError();
+ }
+
+ const auto duration = TDuration::FromValue(value);
+ auto& retention = *add_changefeed->mutable_retention_period();
+ retention.set_seconds(duration.Seconds());
+ retention.set_nanos(duration.NanoSecondsOfSecond());
} else if (name == "local") {
// nop
} else {
diff --git a/ydb/library/yql/sql/v1/SQLv1.g.in b/ydb/library/yql/sql/v1/SQLv1.g.in
index 31ac78aebd6..21ffa80530b 100644
--- a/ydb/library/yql/sql/v1/SQLv1.g.in
+++ b/ydb/library/yql/sql/v1/SQLv1.g.in
@@ -559,10 +559,7 @@ local_index: LOCAL;
changefeed: CHANGEFEED an_id WITH LPAREN changefeed_settings RPAREN;
changefeed_settings: changefeed_settings_entry (COMMA changefeed_settings_entry)*;
changefeed_settings_entry: an_id EQUALS changefeed_setting_value;
-changefeed_setting_value:
- STRING_VALUE
- | bool_value
-;
+changefeed_setting_value: expr;
changefeed_alter_settings:
DISABLE
| SET LPAREN changefeed_settings RPAREN
diff --git a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
index d2c7b9bf61a..fb8af707674 100644
--- a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
+++ b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
@@ -322,6 +322,8 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
"ALTER TABLE user\n\tADD CHANGEFEED user WITH (initial_scan = TRUE);\n"},
{"alter table user add changefeed user with (initial_scan = FaLsE)",
"ALTER TABLE user\n\tADD CHANGEFEED user WITH (initial_scan = FALSE);\n"},
+ {"alter table user add changefeed user with (retention_period = Interval(\"P1D\"))",
+ "ALTER TABLE user\n\tADD CHANGEFEED user WITH (retention_period = Interval(\"P1D\"));\n"},
};
TSetup setup;
diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h
index e5591e42b3a..5c86afc9a05 100644
--- a/ydb/library/yql/sql/v1/node.h
+++ b/ydb/library/yql/sql/v1/node.h
@@ -1153,6 +1153,7 @@ namespace NSQLTranslationV1 {
TNodePtr Mode;
TNodePtr Format;
TNodePtr InitialScan;
+ TNodePtr RetentionPeriod;
std::optional<std::variant<TLocalSinkSettings>> SinkSettings;
};
diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp
index 359fb2e1b05..f83b66287ba 100644
--- a/ydb/library/yql/sql/v1/query.cpp
+++ b/ydb/library/yql/sql/v1/query.cpp
@@ -127,6 +127,9 @@ static INode::TPtr CreateChangefeedDesc(const TChangefeedDescription& desc, cons
if (desc.Settings.InitialScan) {
settings = node.L(settings, node.Q(node.Y(node.Q("initial_scan"), desc.Settings.InitialScan)));
}
+ if (desc.Settings.RetentionPeriod) {
+ settings = node.L(settings, node.Q(node.Y(node.Q("retention_period"), desc.Settings.RetentionPeriod)));
+ }
if (const auto& sink = desc.Settings.SinkSettings) {
switch (sink->index()) {
case 0: // local
diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp
index 6cc3554001b..bf91e2128a9 100644
--- a/ydb/library/yql/sql/v1/sql.cpp
+++ b/ydb/library/yql/sql/v1/sql.cpp
@@ -665,83 +665,6 @@ static bool CreateTableIndex(const TRule_table_index& node, TTranslation& ctx, T
return true;
}
-static bool ChangefeedSettingsEntry(const TRule_changefeed_settings_entry& node, TTranslation& ctx, TChangefeedSettings& settings, bool alter) {
- const auto id = IdEx(node.GetRule_an_id1(), ctx);
- if (alter) {
- // currently we don't support alter settings
- ctx.Error() << to_upper(id.Name) << " alter is not supported";
- return false;
- }
-
- const TToken* token = nullptr;
- const auto& setting = node.GetRule_changefeed_setting_value3();
- switch (setting.Alt_case()) {
- case TRule_changefeed_setting_value::kAltChangefeedSettingValue1:
- token = &setting.GetAlt_changefeed_setting_value1().GetToken1();
- break;
- case TRule_changefeed_setting_value::kAltChangefeedSettingValue2:
- token = &setting.GetAlt_changefeed_setting_value2().GetRule_bool_value1().GetToken1();
- break;
- default:
- return false;
- }
-
- YQL_ENSURE(token);
- const TString value(ctx.Token(*token));
- const auto pos = GetPos(*token);
-
- if (to_lower(id.Name) == "sink_type") {
- auto parsed = StringContent(ctx.Context(), pos, value);
- YQL_ENSURE(parsed.Defined());
- if (to_lower(parsed->Content) == "local") {
- settings.SinkSettings = TChangefeedSettings::TLocalSinkSettings();
- } else {
- ctx.Context().Error() << "Unknown changefeed sink type: " << to_upper(parsed->Content);
- return false;
- }
- } else if (to_lower(id.Name) == "mode") {
- settings.Mode = BuildLiteralSmartString(ctx.Context(), value);
- } else if (to_lower(id.Name) == "format") {
- settings.Format = BuildLiteralSmartString(ctx.Context(), value);
- } else if (to_lower(id.Name) == "initial_scan") {
- bool v;
- if (!TryFromString<bool>(to_lower(value), v)) {
- ctx.Context().Error(id.Pos) << "Invalid changefeed setting: " << id.Name;
- return false;
- }
- settings.InitialScan = BuildLiteralBool(pos, v);
- } else {
- ctx.Context().Error(id.Pos) << "Unknown changefeed setting: " << id.Name;
- return false;
- }
-
- return true;
-}
-
-static bool ChangefeedSettings(const TRule_changefeed_settings& node, TTranslation& ctx, TChangefeedSettings& settings, bool alter) {
- if (!ChangefeedSettingsEntry(node.GetRule_changefeed_settings_entry1(), ctx, settings, alter)) {
- return false;
- }
-
- for (auto& block : node.GetBlock2()) {
- if (!ChangefeedSettingsEntry(block.GetRule_changefeed_settings_entry2(), ctx, settings, alter)) {
- return false;
- }
- }
-
- return true;
-}
-
-static bool CreateChangefeed(const TRule_changefeed& node, TTranslation& ctx, TVector<TChangefeedDescription>& changefeeds) {
- changefeeds.emplace_back(IdEx(node.GetRule_an_id2(), ctx));
-
- if (!ChangefeedSettings(node.GetRule_changefeed_settings5(), ctx, changefeeds.back().Settings, false)) {
- return false;
- }
-
- return true;
-}
-
static std::pair<TString, TString> TableKeyImpl(const std::pair<bool, TString>& nameWithAt, TString view, TTranslation& ctx) {
if (nameWithAt.first) {
view = "@";
@@ -1848,6 +1771,91 @@ bool TSqlTranslation::FillFamilySettings(const TRule_family_settings& settingsNo
return true;
}
+static bool ChangefeedSettingsEntry(const TRule_changefeed_settings_entry& node, TSqlExpression& ctx, TChangefeedSettings& settings, bool alter) {
+ const auto id = IdEx(node.GetRule_an_id1(), ctx);
+ if (alter) {
+ // currently we don't support alter settings
+ ctx.Error() << to_upper(id.Name) << " alter is not supported";
+ return false;
+ }
+
+ const auto& setting = node.GetRule_changefeed_setting_value3();
+ auto exprNode = ctx.Build(setting.GetRule_expr1());
+
+ if (!exprNode) {
+ ctx.Context().Error(id.Pos) << "Invalid changefeed setting: " << id.Name;
+ return false;
+ }
+
+ if (to_lower(id.Name) == "sink_type") {
+ if (!exprNode->IsLiteral() || exprNode->GetLiteralType() != "String") {
+ ctx.Context().Error() << "Literal of String type is expected for " << id.Name;
+ return false;
+ }
+
+ const auto value = exprNode->GetLiteralValue();
+ if (to_lower(value) == "local") {
+ settings.SinkSettings = TChangefeedSettings::TLocalSinkSettings();
+ } else {
+ ctx.Context().Error() << "Unknown changefeed sink type: " << value;
+ return false;
+ }
+ } else if (to_lower(id.Name) == "mode") {
+ if (!exprNode->IsLiteral() || exprNode->GetLiteralType() != "String") {
+ ctx.Context().Error() << "Literal of String type is expected for " << id.Name;
+ return false;
+ }
+ settings.Mode = exprNode;
+ } else if (to_lower(id.Name) == "format") {
+ if (!exprNode->IsLiteral() || exprNode->GetLiteralType() != "String") {
+ ctx.Context().Error() << "Literal of String type is expected for " << id.Name;
+ return false;
+ }
+ settings.Format = exprNode;
+ } else if (to_lower(id.Name) == "initial_scan") {
+ if (!exprNode->IsLiteral() || exprNode->GetLiteralType() != "Bool") {
+ ctx.Context().Error() << "Literal of Bool type is expected for " << id.Name;
+ return false;
+ }
+ settings.InitialScan = exprNode;
+ } else if (to_lower(id.Name) == "retention_period") {
+ if (exprNode->GetOpName() != "Interval") {
+ ctx.Context().Error() << "Literal of Interval type is expected for " << id.Name;
+ return false;
+ }
+ settings.RetentionPeriod = exprNode;
+ } else {
+ ctx.Context().Error(id.Pos) << "Unknown changefeed setting: " << id.Name;
+ return false;
+ }
+
+ return true;
+}
+
+static bool ChangefeedSettings(const TRule_changefeed_settings& node, TSqlExpression& ctx, TChangefeedSettings& settings, bool alter) {
+ if (!ChangefeedSettingsEntry(node.GetRule_changefeed_settings_entry1(), ctx, settings, alter)) {
+ return false;
+ }
+
+ for (auto& block : node.GetBlock2()) {
+ if (!ChangefeedSettingsEntry(block.GetRule_changefeed_settings_entry2(), ctx, settings, alter)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+static bool CreateChangefeed(const TRule_changefeed& node, TSqlExpression& ctx, TVector<TChangefeedDescription>& changefeeds) {
+ changefeeds.emplace_back(IdEx(node.GetRule_an_id2(), ctx));
+
+ if (!ChangefeedSettings(node.GetRule_changefeed_settings5(), ctx, changefeeds.back().Settings, false)) {
+ return false;
+ }
+
+ return true;
+}
+
bool TSqlTranslation::CreateTableEntry(const TRule_create_table_entry& node, TCreateTableParameters& params)
{
switch (node.Alt_case()) {
@@ -1965,7 +1973,8 @@ bool TSqlTranslation::CreateTableEntry(const TRule_create_table_entry& node, TCr
{
// changefeed
auto& changefeed = node.GetAlt_create_table_entry5().GetRule_changefeed1();
- if (!CreateChangefeed(changefeed, *this, params.Changefeeds)) {
+ TSqlExpression expr(Ctx, Mode);
+ if (!CreateChangefeed(changefeed, expr, params.Changefeeds)) {
return false;
}
break;
@@ -9497,7 +9506,8 @@ void TSqlQuery::AlterTableRenameIndexTo(const TRule_alter_table_rename_index_to&
}
bool TSqlQuery::AlterTableAddChangefeed(const TRule_alter_table_add_changefeed& node, TAlterTableParameters& params) {
- return CreateChangefeed(node.GetRule_changefeed2(), *this, params.AddChangefeeds);
+ TSqlExpression expr(Ctx, Mode);
+ return CreateChangefeed(node.GetRule_changefeed2(), expr, params.AddChangefeeds);
}
bool TSqlQuery::AlterTableAlterChangefeed(const TRule_alter_table_alter_changefeed& node, TAlterTableParameters& params) {
@@ -9513,7 +9523,8 @@ bool TSqlQuery::AlterTableAlterChangefeed(const TRule_alter_table_alter_changefe
case TRule_changefeed_alter_settings::kAltChangefeedAlterSettings2: {
// SET
const auto& rule = alter.GetAlt_changefeed_alter_settings2().GetRule_changefeed_settings3();
- if (!ChangefeedSettings(rule, *this, params.AlterChangefeeds.back().Settings, true)) {
+ TSqlExpression expr(Ctx, Mode);
+ if (!ChangefeedSettings(rule, expr, params.AlterChangefeeds.back().Settings, true)) {
return false;
}
break;
diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp
index 8b39f1e0b6d..87f518a6bf2 100644
--- a/ydb/library/yql/sql/v1/sql_ut.cpp
+++ b/ydb/library/yql/sql/v1/sql_ut.cpp
@@ -1622,7 +1622,12 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
auto res = SqlToYql(R"( USE plato;
CREATE TABLE tableName (
Key Uint32, PRIMARY KEY (Key),
- CHANGEFEED feedName WITH (MODE = 'KEYS_ONLY', FORMAT = 'json', INITIAL_SCAN = TRUE)
+ CHANGEFEED feedName WITH (
+ MODE = 'KEYS_ONLY',
+ FORMAT = 'json',
+ INITIAL_SCAN = TRUE,
+ RETENTION_PERIOD = Interval("P1D")
+ )
);
)");
UNIT_ASSERT_C(res.Root, Err2Str(res));
@@ -1636,6 +1641,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("json"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("initial_scan"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("true"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("retention_period"));
}
};
@@ -3269,7 +3275,20 @@ select FormatType($f());
)";
auto res = SqlToYql(req);
UNIT_ASSERT(!res.Root);
- UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:80: Error: Invalid changefeed setting: INITIAL_SCAN\n");
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:95: Error: Literal of Bool type is expected for INITIAL_SCAN\n");
+ }
+
+ Y_UNIT_TEST(InvalidChangefeedRetentionPeriod) {
+ auto req = R"(
+ USE plato;
+ CREATE TABLE tableName (
+ Key Uint32, PRIMARY KEY (Key),
+ CHANGEFEED feedName WITH (MODE = "KEYS_ONLY", FORMAT = "json", RETENTION_PERIOD = "foo")
+ );
+ )";
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:99: Error: Literal of Interval type is expected for RETENTION_PERIOD\n");
}
Y_UNIT_TEST(ErrJoinWithGroupingSetsWithoutCorrelationName) {