diff options
author | ilnaz <ilnaz@yandex-team.com> | 2024-12-03 13:17:06 +0300 |
---|---|---|
committer | ilnaz <ilnaz@yandex-team.com> | 2024-12-03 13:33:17 +0300 |
commit | 556c8da53b8579d7aceb68fe8fa3513851464a75 (patch) | |
tree | 549beca12e961d2c2fbee142cc6856ae0a22b336 | |
parent | 6e7f04fb442b3763c07903c984fed7e40dc0dd5f (diff) | |
download | ydb-556c8da53b8579d7aceb68fe8fa3513851464a75.tar.gz |
New options for ASYNC REPLICATION
commit_hash:7ee0e4b59035ed5c8dedc69e00f95457eca65b88
-rw-r--r-- | yql/essentials/sql/v1/SQLv1.g.in | 2 | ||||
-rw-r--r-- | yql/essentials/sql/v1/SQLv1Antlr4.g.in | 2 | ||||
-rw-r--r-- | yql/essentials/sql/v1/sql_query.cpp | 45 | ||||
-rw-r--r-- | yql/essentials/sql/v1/sql_ut.cpp | 50 | ||||
-rw-r--r-- | yql/essentials/sql/v1/sql_ut_antlr4.cpp | 50 |
5 files changed, 123 insertions, 26 deletions
diff --git a/yql/essentials/sql/v1/SQLv1.g.in b/yql/essentials/sql/v1/SQLv1.g.in index b31594eba3..e9685c5094 100644 --- a/yql/essentials/sql/v1/SQLv1.g.in +++ b/yql/essentials/sql/v1/SQLv1.g.in @@ -893,7 +893,7 @@ create_replication_stmt: CREATE ASYNC REPLICATION object_ref replication_target: object_ref AS object_ref; replication_settings: replication_settings_entry (COMMA replication_settings_entry)*; -replication_settings_entry: an_id EQUALS STRING_VALUE; +replication_settings_entry: an_id EQUALS expr; alter_replication_stmt: ALTER ASYNC REPLICATION object_ref alter_replication_action (COMMA alter_replication_action)*; alter_replication_action: diff --git a/yql/essentials/sql/v1/SQLv1Antlr4.g.in b/yql/essentials/sql/v1/SQLv1Antlr4.g.in index 87cbcaed3d..40593fe075 100644 --- a/yql/essentials/sql/v1/SQLv1Antlr4.g.in +++ b/yql/essentials/sql/v1/SQLv1Antlr4.g.in @@ -892,7 +892,7 @@ create_replication_stmt: CREATE ASYNC REPLICATION object_ref replication_target: object_ref AS object_ref; replication_settings: replication_settings_entry (COMMA replication_settings_entry)*; -replication_settings_entry: an_id EQUALS STRING_VALUE; +replication_settings_entry: an_id EQUALS expr; alter_replication_stmt: ALTER ASYNC REPLICATION object_ref alter_replication_action (COMMA alter_replication_action)*; alter_replication_action: diff --git a/yql/essentials/sql/v1/sql_query.cpp b/yql/essentials/sql/v1/sql_query.cpp index 37cd7ea1eb..781e5d7a5f 100644 --- a/yql/essentials/sql/v1/sql_query.cpp +++ b/yql/essentials/sql/v1/sql_query.cpp @@ -45,10 +45,15 @@ void TSqlQuery::AddStatementToBlocks(TVector<TNodePtr>& blocks, TNodePtr node) { } static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out, - const TRule_replication_settings_entry& in, TTranslation& ctx, bool create) + const TRule_replication_settings_entry& in, TSqlExpression& ctx, bool create) { auto key = IdEx(in.GetRule_an_id1(), ctx); - auto value = BuildLiteralSmartString(ctx.Context(), ctx.Token(in.GetToken3())); + auto value = ctx.Build(in.GetRule_expr3()); + + if (!value) { + ctx.Context().Error() << "Invalid replication setting: " << key.Name; + return false; + } TSet<TString> configSettings = { "connection_string", @@ -61,13 +66,18 @@ static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out, "password_secret_name", }; + TSet<TString> modeSettings = { + "consistency_mode", + "commit_interval", + }; + TSet<TString> stateSettings = { "state", "failover_mode", }; const auto keyName = to_lower(key.Name); - if (!configSettings.count(keyName) && !stateSettings.count(keyName)) { + if (!configSettings.count(keyName) && !modeSettings.count(keyName) && !stateSettings.count(keyName)) { ctx.Context().Error() << "Unknown replication setting: " << key.Name; return false; } @@ -77,6 +87,23 @@ static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out, return false; } + if (!create && modeSettings.count(keyName)) { + ctx.Context().Error() << key.Name << " is not supported in ALTER"; + return false; + } + + if (keyName == "commit_interval") { + if (value->GetOpName() != "Interval") { + ctx.Context().Error() << "Literal of Interval type is expected for " << key.Name; + return false; + } + } else { + if (!value->IsLiteral() || value->GetLiteralType() != "String") { + ctx.Context().Error() << "Literal of String type is expected for " << key.Name; + return false; + } + } + if (!out.emplace(keyName, value).second) { ctx.Context().Error() << "Duplicate replication setting: " << key.Name; } @@ -85,7 +112,7 @@ static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out, } static bool AsyncReplicationSettings(std::map<TString, TNodePtr>& out, - const TRule_replication_settings& in, TTranslation& ctx, bool create) + const TRule_replication_settings& in, TSqlExpression& ctx, bool create) { if (!AsyncReplicationSettingsEntry(out, in.GetRule_replication_settings_entry1(), ctx, create)) { return false; @@ -110,7 +137,7 @@ static bool AsyncReplicationTarget(std::vector<std::pair<TString, TString>>& out } static bool AsyncReplicationAlterAction(std::map<TString, TNodePtr>& settings, - const TRule_alter_replication_action& in, TTranslation& ctx) + const TRule_alter_replication_action& in, TSqlExpression& ctx) { // TODO(ilnaz): support other actions return AsyncReplicationSettings(settings, in.GetRule_alter_replication_set_setting1().GetRule_replication_settings3(), ctx, false); @@ -982,7 +1009,8 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } std::map<TString, TNodePtr> settings; - if (!AsyncReplicationSettings(settings, node.GetRule_replication_settings10(), *this, true)) { + TSqlExpression expr(Ctx, Mode); + if (!AsyncReplicationSettings(settings, node.GetRule_replication_settings10(), expr, true)) { return false; } @@ -1302,11 +1330,12 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } std::map<TString, TNodePtr> settings; - if (!AsyncReplicationAlterAction(settings, node.GetRule_alter_replication_action5(), *this)) { + TSqlExpression expr(Ctx, Mode); + if (!AsyncReplicationAlterAction(settings, node.GetRule_alter_replication_action5(), expr)) { return false; } for (auto& block : node.GetBlock6()) { - if (!AsyncReplicationAlterAction(settings, block.GetRule_alter_replication_action2(), *this)) { + if (!AsyncReplicationAlterAction(settings, block.GetRule_alter_replication_action2(), expr)) { return false; } } diff --git a/yql/essentials/sql/v1/sql_ut.cpp b/yql/essentials/sql/v1/sql_ut.cpp index f2cde62e79..e0d243929f 100644 --- a/yql/essentials/sql/v1/sql_ut.cpp +++ b/yql/essentials/sql/v1/sql_ut.cpp @@ -2997,6 +2997,21 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { } } + Y_UNIT_TEST(AsyncReplicationInvalidCommitInterval) { + auto req = R"( + USE plato; + CREATE ASYNC REPLICATION MyReplication + FOR table1 AS table2, table3 AS table4 + WITH ( + COMMIT_INTERVAL = "FOO" + ); + )"; + + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:6:35: Error: Literal of Interval type is expected for COMMIT_INTERVAL\n"); + } + Y_UNIT_TEST(AlterAsyncReplicationParseCorrect) { auto req = R"( USE plato; @@ -3026,7 +3041,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); } - Y_UNIT_TEST(AlterAsyncReplicationUnsupportedSettings) { + Y_UNIT_TEST(AlterAsyncReplicationSettings) { auto reqTpl = R"( USE plato; ALTER ASYNC REPLICATION MyReplication @@ -3046,19 +3061,17 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { {"password_secret_name", "bar_secret_name"}, }; - for (const auto& setting : settings) { - auto& key = setting.first; - auto& value = setting.second; - auto req = Sprintf(reqTpl, key.c_str(), value.c_str()); + for (const auto& [k, v] : settings) { + auto req = Sprintf(reqTpl, k.c_str(), v.c_str()); auto res = SqlToYql(req); UNIT_ASSERT(res.Root); - TVerifyLineFunc verifyLine = [&key, &value](const TString& word, const TString& line) { + TVerifyLineFunc verifyLine = [&k, &v](const TString& word, const TString& line) { if (word == "Write") { UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("alter")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(key)); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(value)); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(k)); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(v)); } }; @@ -3069,6 +3082,27 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { } } + Y_UNIT_TEST(AlterAsyncReplicationUnsupportedSettings) { + { + auto req = R"( + USE plato; + ALTER ASYNC REPLICATION MyReplication SET (CONSISTENCY_MODE = "STRONG"); + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:79: Error: CONSISTENCY_MODE is not supported in ALTER\n"); + } + { + auto req = R"( + USE plato; + ALTER ASYNC REPLICATION MyReplication SET (COMMIT_INTERVAL = Interval("PT10S")); + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:87: Error: COMMIT_INTERVAL is not supported in ALTER\n"); + } + } + Y_UNIT_TEST(AsyncReplicationInvalidSettings) { auto req = R"( USE plato; diff --git a/yql/essentials/sql/v1/sql_ut_antlr4.cpp b/yql/essentials/sql/v1/sql_ut_antlr4.cpp index 4781156091..c561512136 100644 --- a/yql/essentials/sql/v1/sql_ut_antlr4.cpp +++ b/yql/essentials/sql/v1/sql_ut_antlr4.cpp @@ -2997,6 +2997,21 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { } } + Y_UNIT_TEST(AsyncReplicationInvalidCommitInterval) { + auto req = R"( + USE plato; + CREATE ASYNC REPLICATION MyReplication + FOR table1 AS table2, table3 AS table4 + WITH ( + COMMIT_INTERVAL = "FOO" + ); + )"; + + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:6:35: Error: Literal of Interval type is expected for COMMIT_INTERVAL\n"); + } + Y_UNIT_TEST(AlterAsyncReplicationParseCorrect) { auto req = R"( USE plato; @@ -3026,7 +3041,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); } - Y_UNIT_TEST(AlterAsyncReplicationUnsupportedSettings) { + Y_UNIT_TEST(AlterAsyncReplicationSettings) { auto reqTpl = R"( USE plato; ALTER ASYNC REPLICATION MyReplication @@ -3046,19 +3061,17 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { {"password_secret_name", "bar_secret_name"}, }; - for (const auto& setting : settings) { - auto& key = setting.first; - auto& value = setting.second; - auto req = Sprintf(reqTpl, key.c_str(), value.c_str()); + for (const auto& [k, v] : settings) { + auto req = Sprintf(reqTpl, k.c_str(), v.c_str()); auto res = SqlToYql(req); UNIT_ASSERT(res.Root); - TVerifyLineFunc verifyLine = [&key, &value](const TString& word, const TString& line) { + TVerifyLineFunc verifyLine = [&k, &v](const TString& word, const TString& line) { if (word == "Write") { UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("alter")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(key)); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(value)); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(k)); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(v)); } }; @@ -3069,6 +3082,27 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { } } + Y_UNIT_TEST(AlterAsyncReplicationUnsupportedSettings) { + { + auto req = R"( + USE plato; + ALTER ASYNC REPLICATION MyReplication SET (CONSISTENCY_MODE = "STRONG"); + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:79: Error: CONSISTENCY_MODE is not supported in ALTER\n"); + } + { + auto req = R"( + USE plato; + ALTER ASYNC REPLICATION MyReplication SET (COMMIT_INTERVAL = Interval("PT10S")); + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:87: Error: COMMIT_INTERVAL is not supported in ALTER\n"); + } + } + Y_UNIT_TEST(AsyncReplicationInvalidSettings) { auto req = R"( USE plato; |