aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-04-22 14:56:00 +0300
committerGitHub <noreply@github.com>2024-04-22 14:56:00 +0300
commitfa75a49b04f0c5ffaab7c925e9ebd296ba2dff5b (patch)
tree19fd642092ccad23bb5f5c0546c0c690b447bdc1
parent73d910dc7c6ea4f83ec66ced981b375559e1641a (diff)
downloadydb-fa75a49b04f0c5ffaab7c925e9ebd296ba2dff5b.tar.gz
Validate ASYNC REPLICATION settings (#3973)
-rw-r--r--ydb/library/yql/sql/v1/query.cpp16
-rw-r--r--ydb/library/yql/sql/v1/sql_query.cpp70
-rw-r--r--ydb/library/yql/sql/v1/sql_ut.cpp79
3 files changed, 128 insertions, 37 deletions
diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp
index 113559e265..89af509b1f 100644
--- a/ydb/library/yql/sql/v1/query.cpp
+++ b/ydb/library/yql/sql/v1/query.cpp
@@ -2300,7 +2300,7 @@ public:
bool DoInit(TContext& ctx, ISource* src) override {
Scoped->UseCluster(ServiceId, Cluster);
- auto keys = Y("Key", Q(Y(Q("id"), Y("String", BuildQuotedAtom(Pos, Id)))));
+ auto keys = Y("Key", Q(Y(Q("replication"), Y("String", BuildQuotedAtom(Pos, Id)))));
auto options = FillOptions(Y(Q(Y(Q("mode"), Q(Mode)))));
Add("block", Q(Y(
@@ -2328,7 +2328,7 @@ public:
std::vector<std::pair<TString, TString>>&& targets,
std::map<TString, TNodePtr>&& settings,
const TObjectOperatorContext& context)
- : TAsyncReplication(pos, id, "createAsyncReplication", context)
+ : TAsyncReplication(pos, id, "create", context)
, Targets(std::move(targets))
, Settings(std::move(settings))
{
@@ -2379,23 +2379,15 @@ TNodePtr BuildCreateAsyncReplication(TPosition pos, const TString& id,
class TDropAsyncReplication final: public TAsyncReplication {
public:
explicit TDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context)
- : TAsyncReplication(pos, id, "dropAsyncReplication", context)
- , Cascade(cascade)
+ : TAsyncReplication(pos, id, cascade ? "dropCascade" : "drop", context)
{
}
protected:
INode::TPtr FillOptions(INode::TPtr options) const override {
- if (Cascade) {
- options = L(options, Q(Y(Q("cascade"))));
- }
-
return options;
}
-private:
- const bool Cascade;
-
}; // TDropAsyncReplication
TNodePtr BuildDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context) {
@@ -2407,7 +2399,7 @@ public:
explicit TAlterAsyncReplication(TPosition pos, const TString& id,
std::map<TString, TNodePtr>&& settings,
const TObjectOperatorContext& context)
- : TAsyncReplication(pos, id, "alterAsyncReplication", context)
+ : TAsyncReplication(pos, id, "alter", context)
, Settings(std::move(settings))
{
}
diff --git a/ydb/library/yql/sql/v1/sql_query.cpp b/ydb/library/yql/sql/v1/sql_query.cpp
index e1c827f473..aec8cc7905 100644
--- a/ydb/library/yql/sql/v1/sql_query.cpp
+++ b/ydb/library/yql/sql/v1/sql_query.cpp
@@ -40,21 +40,48 @@ void TSqlQuery::AddStatementToBlocks(TVector<TNodePtr>& blocks, TNodePtr node) {
blocks.emplace_back(node);
}
-static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out, const TRule_replication_settings_entry& in, TTranslation& ctx) {
- auto key = Id(in.GetRule_an_id1(), ctx);
+static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out,
+ const TRule_replication_settings_entry& in, TTranslation& ctx, bool alter)
+{
+ auto key = IdEx(in.GetRule_an_id1(), ctx);
auto value = BuildLiteralSmartString(ctx.Context(), ctx.Token(in.GetToken3()));
- // TODO(ilnaz): validate
- out.emplace(std::move(key), value);
+
+ THashMap<TString, bool> settings = {
+ {"connection_string", false},
+ {"endpoint", false},
+ {"database", false},
+ {"token", false},
+ {"user", false},
+ {"password", false},
+ {"state", true},
+ {"failover_mode", true},
+ };
+
+ auto it = settings.find(to_lower(key.Name));
+ if (it == settings.end()) {
+ ctx.Context().Error() << "Unknown replication setting: " << key.Name;
+ return false;
+ } else if (alter != it->second) {
+ ctx.Context().Error() << key.Name << " is not supported in " << (alter ? "ALTER" : "CREATE");
+ return false;
+ }
+
+ if (!out.emplace(it->first, value).second) {
+ ctx.Context().Error() << "Duplicate replication setting: " << key.Name;
+ }
+
return true;
}
-static bool AsyncReplicationSettings(std::map<TString, TNodePtr>& out, const TRule_replication_settings& in, TTranslation& ctx) {
- if (!AsyncReplicationSettingsEntry(out, in.GetRule_replication_settings_entry1(), ctx)) {
+static bool AsyncReplicationSettings(std::map<TString, TNodePtr>& out,
+ const TRule_replication_settings& in, TTranslation& ctx, bool alter = false)
+{
+ if (!AsyncReplicationSettingsEntry(out, in.GetRule_replication_settings_entry1(), ctx, alter)) {
return false;
}
for (auto& block : in.GetBlock2()) {
- if (!AsyncReplicationSettingsEntry(out, block.GetRule_replication_settings_entry2(), ctx)) {
+ if (!AsyncReplicationSettingsEntry(out, block.GetRule_replication_settings_entry2(), ctx, alter)) {
return false;
}
}
@@ -62,16 +89,20 @@ static bool AsyncReplicationSettings(std::map<TString, TNodePtr>& out, const TRu
return true;
}
-static bool AsyncReplicationTarget(std::vector<std::pair<TString, TString>>& out, const TRule_replication_target& in, TTranslation& ctx) {
+static bool AsyncReplicationTarget(std::vector<std::pair<TString, TString>>& out, TStringBuf prefixPath,
+ const TRule_replication_target& in, TTranslation& ctx)
+{
const TString remote = Id(in.GetRule_object_ref1().GetRule_id_or_at2(), ctx).second;
const TString local = Id(in.GetRule_object_ref3().GetRule_id_or_at2(), ctx).second;
- out.emplace_back(remote, local);
+ out.emplace_back(remote, BuildTablePath(prefixPath, local));
return true;
}
-static bool AsyncReplicationAlterAction(std::map<TString, TNodePtr>& settings, const TRule_alter_replication_action& in, TTranslation& ctx) {
+static bool AsyncReplicationAlterAction(std::map<TString, TNodePtr>& settings,
+ const TRule_alter_replication_action& in, TTranslation& ctx)
+{
// TODO(ilnaz): support other actions
- return AsyncReplicationSettings(settings, in.GetRule_alter_replication_set_setting1().GetRule_replication_settings3(), ctx);
+ return AsyncReplicationSettings(settings, in.GetRule_alter_replication_set_setting1().GetRule_replication_settings3(), ctx, true);
}
bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core) {
@@ -899,12 +930,14 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
}
+ auto prefixPath = Ctx.GetPrefixPath(context.ServiceId, context.Cluster);
+
std::vector<std::pair<TString, TString>> targets;
- if (!AsyncReplicationTarget(targets, node.GetRule_replication_target6(), *this)) {
+ if (!AsyncReplicationTarget(targets, prefixPath, node.GetRule_replication_target6(), *this)) {
return false;
}
for (auto& block : node.GetBlock7()) {
- if (!AsyncReplicationTarget(targets, block.GetRule_replication_target2(), *this)) {
+ if (!AsyncReplicationTarget(targets, prefixPath, block.GetRule_replication_target2(), *this)) {
return false;
}
}
@@ -915,7 +948,8 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
const TString id = Id(node.GetRule_object_ref4().GetRule_id_or_at2(), *this).second;
- AddStatementToBlocks(blocks, BuildCreateAsyncReplication(Ctx.Pos(), id, std::move(targets), std::move(settings), context));
+ AddStatementToBlocks(blocks, BuildCreateAsyncReplication(Ctx.Pos(), BuildTablePath(prefixPath, id),
+ std::move(targets), std::move(settings), context));
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore34: {
@@ -930,7 +964,9 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
const TString id = Id(node.GetRule_object_ref4().GetRule_id_or_at2(), *this).second;
- AddStatementToBlocks(blocks, BuildDropAsyncReplication(Ctx.Pos(), id, node.HasBlock5(), context));
+ AddStatementToBlocks(blocks, BuildDropAsyncReplication(Ctx.Pos(),
+ BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), id),
+ node.HasBlock5(), context));
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore35: {
@@ -1209,7 +1245,9 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
}
const TString id = Id(node.GetRule_object_ref4().GetRule_id_or_at2(), *this).second;
- AddStatementToBlocks(blocks, BuildAlterAsyncReplication(Ctx.Pos(), id, std::move(settings), context));
+ AddStatementToBlocks(blocks, BuildAlterAsyncReplication(Ctx.Pos(),
+ BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), id),
+ std::move(settings), context));
break;
}
case TRule_sql_stmt_core::ALT_NOT_SET:
diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp
index ac53b5f914..751a56aa3b 100644
--- a/ydb/library/yql/sql/v1/sql_ut.cpp
+++ b/ydb/library/yql/sql/v1/sql_ut.cpp
@@ -2559,6 +2559,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
CREATE ASYNC REPLICATION MyReplication
FOR table1 AS table2, table3 AS table4
WITH (
+ CONNECTION_STRING = "grpc://localhost:2135/?database=/MyDatabase",
ENDPOINT = "localhost:2135",
DATABASE = "/MyDatabase"
);
@@ -2569,14 +2570,16 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
if (word == "Write") {
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication"));
- UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createAsyncReplication"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("create"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table1"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table2"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table3"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table4"));
- UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("ENDPOINT"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("connection_string"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("grpc://localhost:2135/?database=/MyDatabase"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("endpoint"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("localhost:2135"));
- UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("DATABASE"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("database"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("/MyDatabase"));
}
};
@@ -2587,6 +2590,29 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
}
+ Y_UNIT_TEST(CreateAsyncReplicationUnsupportedSettings) {
+ auto reqTpl = R"(
+ USE plato;
+ CREATE ASYNC REPLICATION MyReplication
+ FOR table1 AS table2, table3 AS table4
+ WITH (
+ %s = "%s"
+ )
+ )";
+
+ auto settings = THashMap<TString, TString>{
+ {"STATE", "DONE"},
+ {"FAILOVER_MODE", "FORCE"},
+ };
+
+ for (const auto& [k, v] : settings) {
+ auto req = Sprintf(reqTpl, k.c_str(), v.c_str());
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), Sprintf("<main>:6:%zu: Error: %s is not supported in CREATE\n", 20 + k.size(), k.c_str()));
+ }
+ }
+
Y_UNIT_TEST(AlterAsyncReplicationParseCorrect) {
auto req = R"(
USE plato;
@@ -2602,10 +2628,10 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
if (word == "Write") {
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication"));
- UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("alterAsyncReplication"));
- UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("STATE"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("alter"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("state"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("DONE"));
- UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("FAILOVER_MODE"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("failover_mode"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("FORCE"));
}
};
@@ -2616,6 +2642,42 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
}
+ Y_UNIT_TEST(AlterAsyncReplicationUnsupportedSettings) {
+ auto reqTpl = R"(
+ USE plato;
+ ALTER ASYNC REPLICATION MyReplication
+ SET (
+ %s = "%s"
+ )
+ )";
+
+ auto settings = THashMap<TString, TString>{
+ {"CONNECTION_STRING", "grpc://localhost:2135/?database=/MyDatabase"},
+ {"ENDPOINT", "localhost:2135"},
+ {"DATABASE", "/MyDatabase"},
+ {"TOKEN", "foo"},
+ {"USER", "user"},
+ {"PASSWORD", "bar"},
+ };
+
+ for (const auto& [k, v] : settings) {
+ auto req = Sprintf(reqTpl, k.c_str(), v.c_str());
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), Sprintf("<main>:5:%zu: Error: %s is not supported in ALTER\n", 20 + k.size(), k.c_str()));
+ }
+ }
+
+ Y_UNIT_TEST(AsyncReplicationInvalidSettings) {
+ auto req = R"(
+ USE plato;
+ ALTER ASYNC REPLICATION MyReplication SET (FOO = "BAR");
+ )";
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:62: Error: Unknown replication setting: FOO\n");
+ }
+
Y_UNIT_TEST(DropAsyncReplicationParseCorrect) {
auto req = R"(
USE plato;
@@ -2627,8 +2689,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
if (word == "Write") {
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication"));
- UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropAsyncReplication"));
- UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("cascade"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("drop"));
}
};
@@ -2648,7 +2709,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
if (word == "Write") {
- UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("cascade"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropCascade"));
}
};