diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-04-22 14:56:00 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-22 14:56:00 +0300 |
commit | fa75a49b04f0c5ffaab7c925e9ebd296ba2dff5b (patch) | |
tree | 19fd642092ccad23bb5f5c0546c0c690b447bdc1 | |
parent | 73d910dc7c6ea4f83ec66ced981b375559e1641a (diff) | |
download | ydb-fa75a49b04f0c5ffaab7c925e9ebd296ba2dff5b.tar.gz |
Validate ASYNC REPLICATION settings (#3973)
-rw-r--r-- | ydb/library/yql/sql/v1/query.cpp | 16 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql_query.cpp | 70 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql_ut.cpp | 79 |
3 files changed, 128 insertions, 37 deletions
diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp index 113559e265..89af509b1f 100644 --- a/ydb/library/yql/sql/v1/query.cpp +++ b/ydb/library/yql/sql/v1/query.cpp @@ -2300,7 +2300,7 @@ public: bool DoInit(TContext& ctx, ISource* src) override { Scoped->UseCluster(ServiceId, Cluster); - auto keys = Y("Key", Q(Y(Q("id"), Y("String", BuildQuotedAtom(Pos, Id))))); + auto keys = Y("Key", Q(Y(Q("replication"), Y("String", BuildQuotedAtom(Pos, Id))))); auto options = FillOptions(Y(Q(Y(Q("mode"), Q(Mode))))); Add("block", Q(Y( @@ -2328,7 +2328,7 @@ public: std::vector<std::pair<TString, TString>>&& targets, std::map<TString, TNodePtr>&& settings, const TObjectOperatorContext& context) - : TAsyncReplication(pos, id, "createAsyncReplication", context) + : TAsyncReplication(pos, id, "create", context) , Targets(std::move(targets)) , Settings(std::move(settings)) { @@ -2379,23 +2379,15 @@ TNodePtr BuildCreateAsyncReplication(TPosition pos, const TString& id, class TDropAsyncReplication final: public TAsyncReplication { public: explicit TDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context) - : TAsyncReplication(pos, id, "dropAsyncReplication", context) - , Cascade(cascade) + : TAsyncReplication(pos, id, cascade ? "dropCascade" : "drop", context) { } protected: INode::TPtr FillOptions(INode::TPtr options) const override { - if (Cascade) { - options = L(options, Q(Y(Q("cascade")))); - } - return options; } -private: - const bool Cascade; - }; // TDropAsyncReplication TNodePtr BuildDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context) { @@ -2407,7 +2399,7 @@ public: explicit TAlterAsyncReplication(TPosition pos, const TString& id, std::map<TString, TNodePtr>&& settings, const TObjectOperatorContext& context) - : TAsyncReplication(pos, id, "alterAsyncReplication", context) + : TAsyncReplication(pos, id, "alter", context) , Settings(std::move(settings)) { } diff --git a/ydb/library/yql/sql/v1/sql_query.cpp b/ydb/library/yql/sql/v1/sql_query.cpp index e1c827f473..aec8cc7905 100644 --- a/ydb/library/yql/sql/v1/sql_query.cpp +++ b/ydb/library/yql/sql/v1/sql_query.cpp @@ -40,21 +40,48 @@ void TSqlQuery::AddStatementToBlocks(TVector<TNodePtr>& blocks, TNodePtr node) { blocks.emplace_back(node); } -static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out, const TRule_replication_settings_entry& in, TTranslation& ctx) { - auto key = Id(in.GetRule_an_id1(), ctx); +static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out, + const TRule_replication_settings_entry& in, TTranslation& ctx, bool alter) +{ + auto key = IdEx(in.GetRule_an_id1(), ctx); auto value = BuildLiteralSmartString(ctx.Context(), ctx.Token(in.GetToken3())); - // TODO(ilnaz): validate - out.emplace(std::move(key), value); + + THashMap<TString, bool> settings = { + {"connection_string", false}, + {"endpoint", false}, + {"database", false}, + {"token", false}, + {"user", false}, + {"password", false}, + {"state", true}, + {"failover_mode", true}, + }; + + auto it = settings.find(to_lower(key.Name)); + if (it == settings.end()) { + ctx.Context().Error() << "Unknown replication setting: " << key.Name; + return false; + } else if (alter != it->second) { + ctx.Context().Error() << key.Name << " is not supported in " << (alter ? "ALTER" : "CREATE"); + return false; + } + + if (!out.emplace(it->first, value).second) { + ctx.Context().Error() << "Duplicate replication setting: " << key.Name; + } + return true; } -static bool AsyncReplicationSettings(std::map<TString, TNodePtr>& out, const TRule_replication_settings& in, TTranslation& ctx) { - if (!AsyncReplicationSettingsEntry(out, in.GetRule_replication_settings_entry1(), ctx)) { +static bool AsyncReplicationSettings(std::map<TString, TNodePtr>& out, + const TRule_replication_settings& in, TTranslation& ctx, bool alter = false) +{ + if (!AsyncReplicationSettingsEntry(out, in.GetRule_replication_settings_entry1(), ctx, alter)) { return false; } for (auto& block : in.GetBlock2()) { - if (!AsyncReplicationSettingsEntry(out, block.GetRule_replication_settings_entry2(), ctx)) { + if (!AsyncReplicationSettingsEntry(out, block.GetRule_replication_settings_entry2(), ctx, alter)) { return false; } } @@ -62,16 +89,20 @@ static bool AsyncReplicationSettings(std::map<TString, TNodePtr>& out, const TRu return true; } -static bool AsyncReplicationTarget(std::vector<std::pair<TString, TString>>& out, const TRule_replication_target& in, TTranslation& ctx) { +static bool AsyncReplicationTarget(std::vector<std::pair<TString, TString>>& out, TStringBuf prefixPath, + const TRule_replication_target& in, TTranslation& ctx) +{ const TString remote = Id(in.GetRule_object_ref1().GetRule_id_or_at2(), ctx).second; const TString local = Id(in.GetRule_object_ref3().GetRule_id_or_at2(), ctx).second; - out.emplace_back(remote, local); + out.emplace_back(remote, BuildTablePath(prefixPath, local)); return true; } -static bool AsyncReplicationAlterAction(std::map<TString, TNodePtr>& settings, const TRule_alter_replication_action& in, TTranslation& ctx) { +static bool AsyncReplicationAlterAction(std::map<TString, TNodePtr>& settings, + const TRule_alter_replication_action& in, TTranslation& ctx) +{ // TODO(ilnaz): support other actions - return AsyncReplicationSettings(settings, in.GetRule_alter_replication_set_setting1().GetRule_replication_settings3(), ctx); + return AsyncReplicationSettings(settings, in.GetRule_alter_replication_set_setting1().GetRule_replication_settings3(), ctx, true); } bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core) { @@ -899,12 +930,14 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } } + auto prefixPath = Ctx.GetPrefixPath(context.ServiceId, context.Cluster); + std::vector<std::pair<TString, TString>> targets; - if (!AsyncReplicationTarget(targets, node.GetRule_replication_target6(), *this)) { + if (!AsyncReplicationTarget(targets, prefixPath, node.GetRule_replication_target6(), *this)) { return false; } for (auto& block : node.GetBlock7()) { - if (!AsyncReplicationTarget(targets, block.GetRule_replication_target2(), *this)) { + if (!AsyncReplicationTarget(targets, prefixPath, block.GetRule_replication_target2(), *this)) { return false; } } @@ -915,7 +948,8 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } const TString id = Id(node.GetRule_object_ref4().GetRule_id_or_at2(), *this).second; - AddStatementToBlocks(blocks, BuildCreateAsyncReplication(Ctx.Pos(), id, std::move(targets), std::move(settings), context)); + AddStatementToBlocks(blocks, BuildCreateAsyncReplication(Ctx.Pos(), BuildTablePath(prefixPath, id), + std::move(targets), std::move(settings), context)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore34: { @@ -930,7 +964,9 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } const TString id = Id(node.GetRule_object_ref4().GetRule_id_or_at2(), *this).second; - AddStatementToBlocks(blocks, BuildDropAsyncReplication(Ctx.Pos(), id, node.HasBlock5(), context)); + AddStatementToBlocks(blocks, BuildDropAsyncReplication(Ctx.Pos(), + BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), id), + node.HasBlock5(), context)); break; } case TRule_sql_stmt_core::kAltSqlStmtCore35: { @@ -1209,7 +1245,9 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& } const TString id = Id(node.GetRule_object_ref4().GetRule_id_or_at2(), *this).second; - AddStatementToBlocks(blocks, BuildAlterAsyncReplication(Ctx.Pos(), id, std::move(settings), context)); + AddStatementToBlocks(blocks, BuildAlterAsyncReplication(Ctx.Pos(), + BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), id), + std::move(settings), context)); break; } case TRule_sql_stmt_core::ALT_NOT_SET: diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp index ac53b5f914..751a56aa3b 100644 --- a/ydb/library/yql/sql/v1/sql_ut.cpp +++ b/ydb/library/yql/sql/v1/sql_ut.cpp @@ -2559,6 +2559,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { CREATE ASYNC REPLICATION MyReplication FOR table1 AS table2, table3 AS table4 WITH ( + CONNECTION_STRING = "grpc://localhost:2135/?database=/MyDatabase", ENDPOINT = "localhost:2135", DATABASE = "/MyDatabase" ); @@ -2569,14 +2570,16 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { if (word == "Write") { UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createAsyncReplication")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("create")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table1")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table2")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table3")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table4")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("ENDPOINT")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("connection_string")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("grpc://localhost:2135/?database=/MyDatabase")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("endpoint")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("localhost:2135")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("DATABASE")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("database")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("/MyDatabase")); } }; @@ -2587,6 +2590,29 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); } + Y_UNIT_TEST(CreateAsyncReplicationUnsupportedSettings) { + auto reqTpl = R"( + USE plato; + CREATE ASYNC REPLICATION MyReplication + FOR table1 AS table2, table3 AS table4 + WITH ( + %s = "%s" + ) + )"; + + auto settings = THashMap<TString, TString>{ + {"STATE", "DONE"}, + {"FAILOVER_MODE", "FORCE"}, + }; + + for (const auto& [k, v] : settings) { + auto req = Sprintf(reqTpl, k.c_str(), v.c_str()); + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), Sprintf("<main>:6:%zu: Error: %s is not supported in CREATE\n", 20 + k.size(), k.c_str())); + } + } + Y_UNIT_TEST(AlterAsyncReplicationParseCorrect) { auto req = R"( USE plato; @@ -2602,10 +2628,10 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { if (word == "Write") { UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("alterAsyncReplication")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("STATE")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("alter")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("state")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("DONE")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("FAILOVER_MODE")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("failover_mode")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("FORCE")); } }; @@ -2616,6 +2642,42 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); } + Y_UNIT_TEST(AlterAsyncReplicationUnsupportedSettings) { + auto reqTpl = R"( + USE plato; + ALTER ASYNC REPLICATION MyReplication + SET ( + %s = "%s" + ) + )"; + + auto settings = THashMap<TString, TString>{ + {"CONNECTION_STRING", "grpc://localhost:2135/?database=/MyDatabase"}, + {"ENDPOINT", "localhost:2135"}, + {"DATABASE", "/MyDatabase"}, + {"TOKEN", "foo"}, + {"USER", "user"}, + {"PASSWORD", "bar"}, + }; + + for (const auto& [k, v] : settings) { + auto req = Sprintf(reqTpl, k.c_str(), v.c_str()); + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), Sprintf("<main>:5:%zu: Error: %s is not supported in ALTER\n", 20 + k.size(), k.c_str())); + } + } + + Y_UNIT_TEST(AsyncReplicationInvalidSettings) { + auto req = R"( + USE plato; + ALTER ASYNC REPLICATION MyReplication SET (FOO = "BAR"); + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:62: Error: Unknown replication setting: FOO\n"); + } + Y_UNIT_TEST(DropAsyncReplicationParseCorrect) { auto req = R"( USE plato; @@ -2627,8 +2689,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { if (word == "Write") { UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication")); - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropAsyncReplication")); - UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("cascade")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("drop")); } }; @@ -2648,7 +2709,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { if (word == "Write") { - UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("cascade")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropCascade")); } }; |