aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@yandex-team.com>2024-12-03 13:17:06 +0300
committerilnaz <ilnaz@yandex-team.com>2024-12-03 13:33:17 +0300
commit556c8da53b8579d7aceb68fe8fa3513851464a75 (patch)
tree549beca12e961d2c2fbee142cc6856ae0a22b336
parent6e7f04fb442b3763c07903c984fed7e40dc0dd5f (diff)
downloadydb-556c8da53b8579d7aceb68fe8fa3513851464a75.tar.gz
New options for ASYNC REPLICATION
commit_hash:7ee0e4b59035ed5c8dedc69e00f95457eca65b88
-rw-r--r--yql/essentials/sql/v1/SQLv1.g.in2
-rw-r--r--yql/essentials/sql/v1/SQLv1Antlr4.g.in2
-rw-r--r--yql/essentials/sql/v1/sql_query.cpp45
-rw-r--r--yql/essentials/sql/v1/sql_ut.cpp50
-rw-r--r--yql/essentials/sql/v1/sql_ut_antlr4.cpp50
5 files changed, 123 insertions, 26 deletions
diff --git a/yql/essentials/sql/v1/SQLv1.g.in b/yql/essentials/sql/v1/SQLv1.g.in
index b31594eba3..e9685c5094 100644
--- a/yql/essentials/sql/v1/SQLv1.g.in
+++ b/yql/essentials/sql/v1/SQLv1.g.in
@@ -893,7 +893,7 @@ create_replication_stmt: CREATE ASYNC REPLICATION object_ref
replication_target: object_ref AS object_ref;
replication_settings: replication_settings_entry (COMMA replication_settings_entry)*;
-replication_settings_entry: an_id EQUALS STRING_VALUE;
+replication_settings_entry: an_id EQUALS expr;
alter_replication_stmt: ALTER ASYNC REPLICATION object_ref alter_replication_action (COMMA alter_replication_action)*;
alter_replication_action:
diff --git a/yql/essentials/sql/v1/SQLv1Antlr4.g.in b/yql/essentials/sql/v1/SQLv1Antlr4.g.in
index 87cbcaed3d..40593fe075 100644
--- a/yql/essentials/sql/v1/SQLv1Antlr4.g.in
+++ b/yql/essentials/sql/v1/SQLv1Antlr4.g.in
@@ -892,7 +892,7 @@ create_replication_stmt: CREATE ASYNC REPLICATION object_ref
replication_target: object_ref AS object_ref;
replication_settings: replication_settings_entry (COMMA replication_settings_entry)*;
-replication_settings_entry: an_id EQUALS STRING_VALUE;
+replication_settings_entry: an_id EQUALS expr;
alter_replication_stmt: ALTER ASYNC REPLICATION object_ref alter_replication_action (COMMA alter_replication_action)*;
alter_replication_action:
diff --git a/yql/essentials/sql/v1/sql_query.cpp b/yql/essentials/sql/v1/sql_query.cpp
index 37cd7ea1eb..781e5d7a5f 100644
--- a/yql/essentials/sql/v1/sql_query.cpp
+++ b/yql/essentials/sql/v1/sql_query.cpp
@@ -45,10 +45,15 @@ void TSqlQuery::AddStatementToBlocks(TVector<TNodePtr>& blocks, TNodePtr node) {
}
static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out,
- const TRule_replication_settings_entry& in, TTranslation& ctx, bool create)
+ const TRule_replication_settings_entry& in, TSqlExpression& ctx, bool create)
{
auto key = IdEx(in.GetRule_an_id1(), ctx);
- auto value = BuildLiteralSmartString(ctx.Context(), ctx.Token(in.GetToken3()));
+ auto value = ctx.Build(in.GetRule_expr3());
+
+ if (!value) {
+ ctx.Context().Error() << "Invalid replication setting: " << key.Name;
+ return false;
+ }
TSet<TString> configSettings = {
"connection_string",
@@ -61,13 +66,18 @@ static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out,
"password_secret_name",
};
+ TSet<TString> modeSettings = {
+ "consistency_mode",
+ "commit_interval",
+ };
+
TSet<TString> stateSettings = {
"state",
"failover_mode",
};
const auto keyName = to_lower(key.Name);
- if (!configSettings.count(keyName) && !stateSettings.count(keyName)) {
+ if (!configSettings.count(keyName) && !modeSettings.count(keyName) && !stateSettings.count(keyName)) {
ctx.Context().Error() << "Unknown replication setting: " << key.Name;
return false;
}
@@ -77,6 +87,23 @@ static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out,
return false;
}
+ if (!create && modeSettings.count(keyName)) {
+ ctx.Context().Error() << key.Name << " is not supported in ALTER";
+ return false;
+ }
+
+ if (keyName == "commit_interval") {
+ if (value->GetOpName() != "Interval") {
+ ctx.Context().Error() << "Literal of Interval type is expected for " << key.Name;
+ return false;
+ }
+ } else {
+ if (!value->IsLiteral() || value->GetLiteralType() != "String") {
+ ctx.Context().Error() << "Literal of String type is expected for " << key.Name;
+ return false;
+ }
+ }
+
if (!out.emplace(keyName, value).second) {
ctx.Context().Error() << "Duplicate replication setting: " << key.Name;
}
@@ -85,7 +112,7 @@ static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out,
}
static bool AsyncReplicationSettings(std::map<TString, TNodePtr>& out,
- const TRule_replication_settings& in, TTranslation& ctx, bool create)
+ const TRule_replication_settings& in, TSqlExpression& ctx, bool create)
{
if (!AsyncReplicationSettingsEntry(out, in.GetRule_replication_settings_entry1(), ctx, create)) {
return false;
@@ -110,7 +137,7 @@ static bool AsyncReplicationTarget(std::vector<std::pair<TString, TString>>& out
}
static bool AsyncReplicationAlterAction(std::map<TString, TNodePtr>& settings,
- const TRule_alter_replication_action& in, TTranslation& ctx)
+ const TRule_alter_replication_action& in, TSqlExpression& ctx)
{
// TODO(ilnaz): support other actions
return AsyncReplicationSettings(settings, in.GetRule_alter_replication_set_setting1().GetRule_replication_settings3(), ctx, false);
@@ -982,7 +1009,8 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
std::map<TString, TNodePtr> settings;
- if (!AsyncReplicationSettings(settings, node.GetRule_replication_settings10(), *this, true)) {
+ TSqlExpression expr(Ctx, Mode);
+ if (!AsyncReplicationSettings(settings, node.GetRule_replication_settings10(), expr, true)) {
return false;
}
@@ -1302,11 +1330,12 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
std::map<TString, TNodePtr> settings;
- if (!AsyncReplicationAlterAction(settings, node.GetRule_alter_replication_action5(), *this)) {
+ TSqlExpression expr(Ctx, Mode);
+ if (!AsyncReplicationAlterAction(settings, node.GetRule_alter_replication_action5(), expr)) {
return false;
}
for (auto& block : node.GetBlock6()) {
- if (!AsyncReplicationAlterAction(settings, block.GetRule_alter_replication_action2(), *this)) {
+ if (!AsyncReplicationAlterAction(settings, block.GetRule_alter_replication_action2(), expr)) {
return false;
}
}
diff --git a/yql/essentials/sql/v1/sql_ut.cpp b/yql/essentials/sql/v1/sql_ut.cpp
index f2cde62e79..e0d243929f 100644
--- a/yql/essentials/sql/v1/sql_ut.cpp
+++ b/yql/essentials/sql/v1/sql_ut.cpp
@@ -2997,6 +2997,21 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
}
}
+ Y_UNIT_TEST(AsyncReplicationInvalidCommitInterval) {
+ auto req = R"(
+ USE plato;
+ CREATE ASYNC REPLICATION MyReplication
+ FOR table1 AS table2, table3 AS table4
+ WITH (
+ COMMIT_INTERVAL = "FOO"
+ );
+ )";
+
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:6:35: Error: Literal of Interval type is expected for COMMIT_INTERVAL\n");
+ }
+
Y_UNIT_TEST(AlterAsyncReplicationParseCorrect) {
auto req = R"(
USE plato;
@@ -3026,7 +3041,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
}
- Y_UNIT_TEST(AlterAsyncReplicationUnsupportedSettings) {
+ Y_UNIT_TEST(AlterAsyncReplicationSettings) {
auto reqTpl = R"(
USE plato;
ALTER ASYNC REPLICATION MyReplication
@@ -3046,19 +3061,17 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
{"password_secret_name", "bar_secret_name"},
};
- for (const auto& setting : settings) {
- auto& key = setting.first;
- auto& value = setting.second;
- auto req = Sprintf(reqTpl, key.c_str(), value.c_str());
+ for (const auto& [k, v] : settings) {
+ auto req = Sprintf(reqTpl, k.c_str(), v.c_str());
auto res = SqlToYql(req);
UNIT_ASSERT(res.Root);
- TVerifyLineFunc verifyLine = [&key, &value](const TString& word, const TString& line) {
+ TVerifyLineFunc verifyLine = [&k, &v](const TString& word, const TString& line) {
if (word == "Write") {
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("alter"));
- UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(key));
- UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(value));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(k));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(v));
}
};
@@ -3069,6 +3082,27 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
}
}
+ Y_UNIT_TEST(AlterAsyncReplicationUnsupportedSettings) {
+ {
+ auto req = R"(
+ USE plato;
+ ALTER ASYNC REPLICATION MyReplication SET (CONSISTENCY_MODE = "STRONG");
+ )";
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:79: Error: CONSISTENCY_MODE is not supported in ALTER\n");
+ }
+ {
+ auto req = R"(
+ USE plato;
+ ALTER ASYNC REPLICATION MyReplication SET (COMMIT_INTERVAL = Interval("PT10S"));
+ )";
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:87: Error: COMMIT_INTERVAL is not supported in ALTER\n");
+ }
+ }
+
Y_UNIT_TEST(AsyncReplicationInvalidSettings) {
auto req = R"(
USE plato;
diff --git a/yql/essentials/sql/v1/sql_ut_antlr4.cpp b/yql/essentials/sql/v1/sql_ut_antlr4.cpp
index 4781156091..c561512136 100644
--- a/yql/essentials/sql/v1/sql_ut_antlr4.cpp
+++ b/yql/essentials/sql/v1/sql_ut_antlr4.cpp
@@ -2997,6 +2997,21 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
}
}
+ Y_UNIT_TEST(AsyncReplicationInvalidCommitInterval) {
+ auto req = R"(
+ USE plato;
+ CREATE ASYNC REPLICATION MyReplication
+ FOR table1 AS table2, table3 AS table4
+ WITH (
+ COMMIT_INTERVAL = "FOO"
+ );
+ )";
+
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:6:35: Error: Literal of Interval type is expected for COMMIT_INTERVAL\n");
+ }
+
Y_UNIT_TEST(AlterAsyncReplicationParseCorrect) {
auto req = R"(
USE plato;
@@ -3026,7 +3041,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
}
- Y_UNIT_TEST(AlterAsyncReplicationUnsupportedSettings) {
+ Y_UNIT_TEST(AlterAsyncReplicationSettings) {
auto reqTpl = R"(
USE plato;
ALTER ASYNC REPLICATION MyReplication
@@ -3046,19 +3061,17 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
{"password_secret_name", "bar_secret_name"},
};
- for (const auto& setting : settings) {
- auto& key = setting.first;
- auto& value = setting.second;
- auto req = Sprintf(reqTpl, key.c_str(), value.c_str());
+ for (const auto& [k, v] : settings) {
+ auto req = Sprintf(reqTpl, k.c_str(), v.c_str());
auto res = SqlToYql(req);
UNIT_ASSERT(res.Root);
- TVerifyLineFunc verifyLine = [&key, &value](const TString& word, const TString& line) {
+ TVerifyLineFunc verifyLine = [&k, &v](const TString& word, const TString& line) {
if (word == "Write") {
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("alter"));
- UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(key));
- UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(value));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(k));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find(v));
}
};
@@ -3069,6 +3082,27 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
}
}
+ Y_UNIT_TEST(AlterAsyncReplicationUnsupportedSettings) {
+ {
+ auto req = R"(
+ USE plato;
+ ALTER ASYNC REPLICATION MyReplication SET (CONSISTENCY_MODE = "STRONG");
+ )";
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:79: Error: CONSISTENCY_MODE is not supported in ALTER\n");
+ }
+ {
+ auto req = R"(
+ USE plato;
+ ALTER ASYNC REPLICATION MyReplication SET (COMMIT_INTERVAL = Interval("PT10S"));
+ )";
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:87: Error: COMMIT_INTERVAL is not supported in ALTER\n");
+ }
+ }
+
Y_UNIT_TEST(AsyncReplicationInvalidSettings) {
auto req = R"(
USE plato;