diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-11-24 16:26:32 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-11-24 16:26:32 +0300 |
commit | 68f4c0da11f5cf3f264840d2197e36c8e2c6367c (patch) | |
tree | 9b1bc67979988626bad07c9599c1e5433cb85ade | |
parent | 5fbd67db573577fafc46098602461037b0828c71 (diff) | |
download | ydb-68f4c0da11f5cf3f264840d2197e36c8e2c6367c.tar.gz |
RETENTION_PERIOD option for changefeeds
-rw-r--r-- | ydb/core/grpc_services/rpc_alter_table.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 16 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/SQLv1.g.in | 5 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/format/sql_format_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/node.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/query.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql.cpp | 171 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql_ut.cpp | 23 |
8 files changed, 138 insertions, 87 deletions
diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp index 45441a567bd..26aca5ef3b5 100644 --- a/ydb/core/grpc_services/rpc_alter_table.cpp +++ b/ydb/core/grpc_services/rpc_alter_table.cpp @@ -462,10 +462,12 @@ private: for (const auto& add : req->add_changefeeds()) { auto op = modifyScheme->MutableCreateCdcStream(); op->SetTableName(name); + if (add.has_retention_period()) { + op->SetRetentionPeriodSeconds(add.retention_period().seconds()); + } StatusIds::StatusCode code; TString error; - if (!FillChangefeedDescription(*op->MutableStreamDescription(), add, code, error)) { NYql::TIssues issues; issues.AddIssue(NYql::TIssue(error)); diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 05d797862bf..6360ad8063c 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -970,6 +970,22 @@ public: ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()), TStringBuilder() << name << " setting is not supported yet")); return SyncError(); + } else if (name == "retention_period") { + YQL_ENSURE(setting.Value().Maybe<TCoInterval>()); + const auto value = FromString<i64>( + setting.Value().Cast<TCoInterval>().Literal().Value() + ); + + if (value <= 0) { + ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()), + TStringBuilder() << name << " must be positive")); + return SyncError(); + } + + const auto duration = TDuration::FromValue(value); + auto& retention = *add_changefeed->mutable_retention_period(); + retention.set_seconds(duration.Seconds()); + retention.set_nanos(duration.NanoSecondsOfSecond()); } else if (name == "local") { // nop } else { diff --git a/ydb/library/yql/sql/v1/SQLv1.g.in b/ydb/library/yql/sql/v1/SQLv1.g.in index 31ac78aebd6..21ffa80530b 100644 --- a/ydb/library/yql/sql/v1/SQLv1.g.in +++ b/ydb/library/yql/sql/v1/SQLv1.g.in @@ -559,10 +559,7 @@ local_index: LOCAL; changefeed: CHANGEFEED an_id WITH LPAREN changefeed_settings RPAREN; changefeed_settings: changefeed_settings_entry (COMMA changefeed_settings_entry)*; changefeed_settings_entry: an_id EQUALS changefeed_setting_value; -changefeed_setting_value: - STRING_VALUE - | bool_value -; +changefeed_setting_value: expr; changefeed_alter_settings: DISABLE | SET LPAREN changefeed_settings RPAREN diff --git a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp index d2c7b9bf61a..fb8af707674 100644 --- a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp +++ b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp @@ -322,6 +322,8 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) { "ALTER TABLE user\n\tADD CHANGEFEED user WITH (initial_scan = TRUE);\n"}, {"alter table user add changefeed user with (initial_scan = FaLsE)", "ALTER TABLE user\n\tADD CHANGEFEED user WITH (initial_scan = FALSE);\n"}, + {"alter table user add changefeed user with (retention_period = Interval(\"P1D\"))", + "ALTER TABLE user\n\tADD CHANGEFEED user WITH (retention_period = Interval(\"P1D\"));\n"}, }; TSetup setup; diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h index e5591e42b3a..5c86afc9a05 100644 --- a/ydb/library/yql/sql/v1/node.h +++ b/ydb/library/yql/sql/v1/node.h @@ -1153,6 +1153,7 @@ namespace NSQLTranslationV1 { TNodePtr Mode; TNodePtr Format; TNodePtr InitialScan; + TNodePtr RetentionPeriod; std::optional<std::variant<TLocalSinkSettings>> SinkSettings; }; diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp index 359fb2e1b05..f83b66287ba 100644 --- a/ydb/library/yql/sql/v1/query.cpp +++ b/ydb/library/yql/sql/v1/query.cpp @@ -127,6 +127,9 @@ static INode::TPtr CreateChangefeedDesc(const TChangefeedDescription& desc, cons if (desc.Settings.InitialScan) { settings = node.L(settings, node.Q(node.Y(node.Q("initial_scan"), desc.Settings.InitialScan))); } + if (desc.Settings.RetentionPeriod) { + settings = node.L(settings, node.Q(node.Y(node.Q("retention_period"), desc.Settings.RetentionPeriod))); + } if (const auto& sink = desc.Settings.SinkSettings) { switch (sink->index()) { case 0: // local diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp index 6cc3554001b..bf91e2128a9 100644 --- a/ydb/library/yql/sql/v1/sql.cpp +++ b/ydb/library/yql/sql/v1/sql.cpp @@ -665,83 +665,6 @@ static bool CreateTableIndex(const TRule_table_index& node, TTranslation& ctx, T return true; } -static bool ChangefeedSettingsEntry(const TRule_changefeed_settings_entry& node, TTranslation& ctx, TChangefeedSettings& settings, bool alter) { - const auto id = IdEx(node.GetRule_an_id1(), ctx); - if (alter) { - // currently we don't support alter settings - ctx.Error() << to_upper(id.Name) << " alter is not supported"; - return false; - } - - const TToken* token = nullptr; - const auto& setting = node.GetRule_changefeed_setting_value3(); - switch (setting.Alt_case()) { - case TRule_changefeed_setting_value::kAltChangefeedSettingValue1: - token = &setting.GetAlt_changefeed_setting_value1().GetToken1(); - break; - case TRule_changefeed_setting_value::kAltChangefeedSettingValue2: - token = &setting.GetAlt_changefeed_setting_value2().GetRule_bool_value1().GetToken1(); - break; - default: - return false; - } - - YQL_ENSURE(token); - const TString value(ctx.Token(*token)); - const auto pos = GetPos(*token); - - if (to_lower(id.Name) == "sink_type") { - auto parsed = StringContent(ctx.Context(), pos, value); - YQL_ENSURE(parsed.Defined()); - if (to_lower(parsed->Content) == "local") { - settings.SinkSettings = TChangefeedSettings::TLocalSinkSettings(); - } else { - ctx.Context().Error() << "Unknown changefeed sink type: " << to_upper(parsed->Content); - return false; - } - } else if (to_lower(id.Name) == "mode") { - settings.Mode = BuildLiteralSmartString(ctx.Context(), value); - } else if (to_lower(id.Name) == "format") { - settings.Format = BuildLiteralSmartString(ctx.Context(), value); - } else if (to_lower(id.Name) == "initial_scan") { - bool v; - if (!TryFromString<bool>(to_lower(value), v)) { - ctx.Context().Error(id.Pos) << "Invalid changefeed setting: " << id.Name; - return false; - } - settings.InitialScan = BuildLiteralBool(pos, v); - } else { - ctx.Context().Error(id.Pos) << "Unknown changefeed setting: " << id.Name; - return false; - } - - return true; -} - -static bool ChangefeedSettings(const TRule_changefeed_settings& node, TTranslation& ctx, TChangefeedSettings& settings, bool alter) { - if (!ChangefeedSettingsEntry(node.GetRule_changefeed_settings_entry1(), ctx, settings, alter)) { - return false; - } - - for (auto& block : node.GetBlock2()) { - if (!ChangefeedSettingsEntry(block.GetRule_changefeed_settings_entry2(), ctx, settings, alter)) { - return false; - } - } - - return true; -} - -static bool CreateChangefeed(const TRule_changefeed& node, TTranslation& ctx, TVector<TChangefeedDescription>& changefeeds) { - changefeeds.emplace_back(IdEx(node.GetRule_an_id2(), ctx)); - - if (!ChangefeedSettings(node.GetRule_changefeed_settings5(), ctx, changefeeds.back().Settings, false)) { - return false; - } - - return true; -} - static std::pair<TString, TString> TableKeyImpl(const std::pair<bool, TString>& nameWithAt, TString view, TTranslation& ctx) { if (nameWithAt.first) { view = "@"; @@ -1848,6 +1771,91 @@ bool TSqlTranslation::FillFamilySettings(const TRule_family_settings& settingsNo return true; } +static bool ChangefeedSettingsEntry(const TRule_changefeed_settings_entry& node, TSqlExpression& ctx, TChangefeedSettings& settings, bool alter) { + const auto id = IdEx(node.GetRule_an_id1(), ctx); + if (alter) { + // currently we don't support alter settings + ctx.Error() << to_upper(id.Name) << " alter is not supported"; + return false; + } + + const auto& setting = node.GetRule_changefeed_setting_value3(); + auto exprNode = ctx.Build(setting.GetRule_expr1()); + + if (!exprNode) { + ctx.Context().Error(id.Pos) << "Invalid changefeed setting: " << id.Name; + return false; + } + + if (to_lower(id.Name) == "sink_type") { + if (!exprNode->IsLiteral() || exprNode->GetLiteralType() != "String") { + ctx.Context().Error() << "Literal of String type is expected for " << id.Name; + return false; + } + + const auto value = exprNode->GetLiteralValue(); + if (to_lower(value) == "local") { + settings.SinkSettings = TChangefeedSettings::TLocalSinkSettings(); + } else { + ctx.Context().Error() << "Unknown changefeed sink type: " << value; + return false; + } + } else if (to_lower(id.Name) == "mode") { + if (!exprNode->IsLiteral() || exprNode->GetLiteralType() != "String") { + ctx.Context().Error() << "Literal of String type is expected for " << id.Name; + return false; + } + settings.Mode = exprNode; + } else if (to_lower(id.Name) == "format") { + if (!exprNode->IsLiteral() || exprNode->GetLiteralType() != "String") { + ctx.Context().Error() << "Literal of String type is expected for " << id.Name; + return false; + } + settings.Format = exprNode; + } else if (to_lower(id.Name) == "initial_scan") { + if (!exprNode->IsLiteral() || exprNode->GetLiteralType() != "Bool") { + ctx.Context().Error() << "Literal of Bool type is expected for " << id.Name; + return false; + } + settings.InitialScan = exprNode; + } else if (to_lower(id.Name) == "retention_period") { + if (exprNode->GetOpName() != "Interval") { + ctx.Context().Error() << "Literal of Interval type is expected for " << id.Name; + return false; + } + settings.RetentionPeriod = exprNode; + } else { + ctx.Context().Error(id.Pos) << "Unknown changefeed setting: " << id.Name; + return false; + } + + return true; +} + +static bool ChangefeedSettings(const TRule_changefeed_settings& node, TSqlExpression& ctx, TChangefeedSettings& settings, bool alter) { + if (!ChangefeedSettingsEntry(node.GetRule_changefeed_settings_entry1(), ctx, settings, alter)) { + return false; + } + + for (auto& block : node.GetBlock2()) { + if (!ChangefeedSettingsEntry(block.GetRule_changefeed_settings_entry2(), ctx, settings, alter)) { + return false; + } + } + + return true; +} + +static bool CreateChangefeed(const TRule_changefeed& node, TSqlExpression& ctx, TVector<TChangefeedDescription>& changefeeds) { + changefeeds.emplace_back(IdEx(node.GetRule_an_id2(), ctx)); + + if (!ChangefeedSettings(node.GetRule_changefeed_settings5(), ctx, changefeeds.back().Settings, false)) { + return false; + } + + return true; +} + bool TSqlTranslation::CreateTableEntry(const TRule_create_table_entry& node, TCreateTableParameters& params) { switch (node.Alt_case()) { @@ -1965,7 +1973,8 @@ bool TSqlTranslation::CreateTableEntry(const TRule_create_table_entry& node, TCr { // changefeed auto& changefeed = node.GetAlt_create_table_entry5().GetRule_changefeed1(); - if (!CreateChangefeed(changefeed, *this, params.Changefeeds)) { + TSqlExpression expr(Ctx, Mode); + if (!CreateChangefeed(changefeed, expr, params.Changefeeds)) { return false; } break; @@ -9497,7 +9506,8 @@ void TSqlQuery::AlterTableRenameIndexTo(const TRule_alter_table_rename_index_to& } bool TSqlQuery::AlterTableAddChangefeed(const TRule_alter_table_add_changefeed& node, TAlterTableParameters& params) { - return CreateChangefeed(node.GetRule_changefeed2(), *this, params.AddChangefeeds); + TSqlExpression expr(Ctx, Mode); + return CreateChangefeed(node.GetRule_changefeed2(), expr, params.AddChangefeeds); } bool TSqlQuery::AlterTableAlterChangefeed(const TRule_alter_table_alter_changefeed& node, TAlterTableParameters& params) { @@ -9513,7 +9523,8 @@ bool TSqlQuery::AlterTableAlterChangefeed(const TRule_alter_table_alter_changefe case TRule_changefeed_alter_settings::kAltChangefeedAlterSettings2: { // SET const auto& rule = alter.GetAlt_changefeed_alter_settings2().GetRule_changefeed_settings3(); - if (!ChangefeedSettings(rule, *this, params.AlterChangefeeds.back().Settings, true)) { + TSqlExpression expr(Ctx, Mode); + if (!ChangefeedSettings(rule, expr, params.AlterChangefeeds.back().Settings, true)) { return false; } break; diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp index 8b39f1e0b6d..87f518a6bf2 100644 --- a/ydb/library/yql/sql/v1/sql_ut.cpp +++ b/ydb/library/yql/sql/v1/sql_ut.cpp @@ -1622,7 +1622,12 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { auto res = SqlToYql(R"( USE plato; CREATE TABLE tableName ( Key Uint32, PRIMARY KEY (Key), - CHANGEFEED feedName WITH (MODE = 'KEYS_ONLY', FORMAT = 'json', INITIAL_SCAN = TRUE) + CHANGEFEED feedName WITH ( + MODE = 'KEYS_ONLY', + FORMAT = 'json', + INITIAL_SCAN = TRUE, + RETENTION_PERIOD = Interval("P1D") + ) ); )"); UNIT_ASSERT_C(res.Root, Err2Str(res)); @@ -1636,6 +1641,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("json")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("initial_scan")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("true")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("retention_period")); } }; @@ -3269,7 +3275,20 @@ select FormatType($f()); )"; auto res = SqlToYql(req); UNIT_ASSERT(!res.Root); - UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:80: Error: Invalid changefeed setting: INITIAL_SCAN\n"); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:95: Error: Literal of Bool type is expected for INITIAL_SCAN\n"); + } + + Y_UNIT_TEST(InvalidChangefeedRetentionPeriod) { + auto req = R"( + USE plato; + CREATE TABLE tableName ( + Key Uint32, PRIMARY KEY (Key), + CHANGEFEED feedName WITH (MODE = "KEYS_ONLY", FORMAT = "json", RETENTION_PERIOD = "foo") + ); + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:99: Error: Literal of Interval type is expected for RETENTION_PERIOD\n"); } Y_UNIT_TEST(ErrJoinWithGroupingSetsWithoutCorrelationName) { |