diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-03-09 11:14:14 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-03-09 11:14:14 +0300 |
commit | f9027a68c7299e928dd71fadad4556f89650e3aa (patch) | |
tree | 226d179e7c051245c3e48fb3001b2bbc73a121bc | |
parent | 984d827f9e03fdc9c473e5f26b8397155ae4fc87 (diff) | |
download | ydb-f9027a68c7299e928dd71fadad4556f89650e3aa.tar.gz |
CREATE/DROP ASYNC REPLICATION syntax
-rw-r--r-- | ydb/library/yql/sql/v1/SQLv1.g.in | 16 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/format/sql_format.cpp | 35 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/format/sql_format_ut.cpp | 14 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/node.h | 5 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/query.cpp | 121 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql.cpp | 91 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql_ut.cpp | 81 |
7 files changed, 341 insertions, 22 deletions
diff --git a/ydb/library/yql/sql/v1/SQLv1.g.in b/ydb/library/yql/sql/v1/SQLv1.g.in index 1b2bba95de..927d94da21 100644 --- a/ydb/library/yql/sql/v1/SQLv1.g.in +++ b/ydb/library/yql/sql/v1/SQLv1.g.in @@ -49,6 +49,8 @@ sql_stmt_core: | drop_object_stmt | create_external_data_source_stmt | drop_external_data_source_stmt + | create_replication_stmt + | drop_replication_stmt ; expr: @@ -608,6 +610,17 @@ drop_role_stmt: DROP (USER|GROUP) (IF EXISTS)? role_name (COMMA role_name)* COMM role_name: an_id_or_type | bind_parameter; create_user_option: ENCRYPTED? PASSWORD expr; +create_replication_stmt: CREATE ASYNC REPLICATION object_ref + FOR replication_target (COMMA replication_target)* + WITH LPAREN replication_settings RPAREN +; + +replication_target: object_ref AS object_ref; +replication_settings: replication_settings_entry (COMMA replication_settings_entry)*; +replication_settings_entry: an_id EQUALS STRING_VALUE; + +drop_replication_stmt: DROP ASYNC REPLICATION object_ref CASCADE?; + action_or_subquery_args: opt_bind_parameter (COMMA opt_bind_parameter)*; define_action_or_subquery_stmt: DEFINE (ACTION|SUBQUERY) bind_parameter LPAREN action_or_subquery_args? RPAREN AS define_action_or_subquery_body END DEFINE; @@ -896,6 +909,7 @@ keyword_hint_uncompat: keyword_as_compat: DATA + | REPLICATION | SOURCE | TYPE ; @@ -1023,6 +1037,7 @@ keyword_compat: ( | RELEASE | RENAME | REPLACE + | REPLICATION | RESET | RESPECT | RESTRICT @@ -1332,6 +1347,7 @@ RELEASE: R E L E A S E; RENAME: R E N A M E; REPEATABLE: R E P E A T A B L E; REPLACE: R E P L A C E; +REPLICATION: R E P L I C A T I O N; RESET: R E S E T; RESOURCE: R E S O U R C E; RESPECT: R E S P E C T; diff --git a/ydb/library/yql/sql/v1/format/sql_format.cpp b/ydb/library/yql/sql/v1/format/sql_format.cpp index 96227ff72b..501f4beb5e 100644 --- a/ydb/library/yql/sql/v1/format/sql_format.cpp +++ b/ydb/library/yql/sql/v1/format/sql_format.cpp @@ -1077,17 +1077,30 @@ private: VisitAllFields(TRule_drop_object_stmt::GetDescriptor(), msg); } - void VisitCreateExternalDataSource(const TRule_create_external_data_source_stmt& msg) {
- PosFromToken(msg.GetToken1());
- NewLine();
- VisitAllFields(TRule_create_external_data_source_stmt::GetDescriptor(), msg);
- }
-
+ void VisitCreateExternalDataSource(const TRule_create_external_data_source_stmt& msg) { + PosFromToken(msg.GetToken1()); + NewLine(); + VisitAllFields(TRule_create_external_data_source_stmt::GetDescriptor(), msg); + } + void VisitDropExternalDataSource(const TRule_drop_external_data_source_stmt& msg) { - MarkAsSimple();
- PosFromToken(msg.GetToken1());
- NewLine();
- VisitAllFields(TRule_drop_external_data_source_stmt::GetDescriptor(), msg);
+ MarkAsSimple(); + PosFromToken(msg.GetToken1()); + NewLine(); + VisitAllFields(TRule_drop_external_data_source_stmt::GetDescriptor(), msg); + } + + void VisitCreateAsyncReplication(const TRule_create_replication_stmt& msg) { + PosFromToken(msg.GetToken1()); + NewLine(); + VisitAllFields(TRule_create_replication_stmt::GetDescriptor(), msg); + } + + void VisitDropAsyncReplication(const TRule_drop_replication_stmt& msg) { + MarkAsSimple(); + PosFromToken(msg.GetToken1()); + NewLine(); + VisitAllFields(TRule_drop_replication_stmt::GetDescriptor(), msg); } template <void (TVisitor::*Func)(const NProtoBuf::Message&)> @@ -1990,6 +2003,8 @@ TStaticData::TStaticData() {TRule_drop_object_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitDropObject)}, {TRule_create_external_data_source_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitCreateExternalDataSource)}, {TRule_drop_external_data_source_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitDropExternalDataSource)}, + {TRule_create_replication_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitCreateAsyncReplication)}, + {TRule_drop_replication_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitDropAsyncReplication)}, }) { // ensure that all statements has a visitor diff --git a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp index 6b6befeb09..3122409979 100644 --- a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp +++ b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp @@ -285,6 +285,20 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) { setup.Run(cases); } + Y_UNIT_TEST(AsyncReplication) { + TCases cases = { + {"create async replication user for table1 AS table2 with (user='foo')", + "CREATE ASYNC REPLICATION user FOR table1 AS table2 WITH (user = 'foo');\n\n"}, + {"drop async replication user", + "DROP ASYNC REPLICATION user;\n"}, + {"drop async replication user cascade", + "DROP ASYNC REPLICATION user CASCADE;\n"}, + }; + + TSetup setup; + setup.Run(cases); + } + Y_UNIT_TEST(TypeSelection) { TCases cases = { {"Select tYpe.* frOm Table tYpe", diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h index ec613cd5cc..3cd9dd6c7e 100644 --- a/ydb/library/yql/sql/v1/node.h +++ b/ydb/library/yql/sql/v1/node.h @@ -1412,6 +1412,11 @@ namespace NSQLTranslationV1 { std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context); TNodePtr BuildDropObjectOperation(TPosition pos, const TString& secretId, const TString& typeId, std::map<TString, TDeferredAtom>&& options, const TObjectOperatorContext& context); + TNodePtr BuildCreateAsyncReplication(TPosition pos, const TString& id, + std::vector<std::pair<TString, TString>>&& targets, + std::map<TString, TNodePtr>&& settings, + const TObjectOperatorContext& context); + TNodePtr BuildDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context); TNodePtr BuildWriteTable(TPosition pos, const TString& label, const TTableRef& table, EWriteColumnMode mode, TNodePtr options, TScopedStatePtr scoped); TNodePtr BuildWriteResult(TPosition pos, const TString& label, TNodePtr settings); diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp index beb67703fb..737aed2c6a 100644 --- a/ydb/library/yql/sql/v1/query.cpp +++ b/ydb/library/yql/sql/v1/query.cpp @@ -1532,6 +1532,127 @@ TNodePtr BuildDropRoles(TPosition pos, const TString& service, const TDeferredAt return new TDropRoles(pos, service, cluster, toDrop, isUser, force, scoped); } +class TAsyncReplication + : public TAstListNode + , protected TObjectOperatorContext +{ +protected: + virtual INode::TPtr FillOptions(INode::TPtr options) const = 0; + +public: + explicit TAsyncReplication(TPosition pos, const TString& id, const TString& mode, const TObjectOperatorContext& context) + : TAstListNode(pos) + , TObjectOperatorContext(context) + , Id(id) + , Mode(mode) + { + } + + bool DoInit(TContext& ctx, ISource* src) override { + Scoped->UseCluster(ServiceId, Cluster); + + auto keys = Y("Key", Q(Y(Q("id"), Y("String", BuildQuotedAtom(Pos, Id))))); + auto options = FillOptions(Y(Q(Y(Q("mode"), Q(Mode))))); + + Add("block", Q(Y( + Y("let", "sink", Y("DataSink", BuildQuotedAtom(Pos, ServiceId), Scoped->WrapCluster(Cluster, ctx))), + Y("let", "world", Y(TString(WriteName), "world", "sink", keys, Y("Void"), Q(options))), + Y("return", ctx.PragmaAutoCommit ? Y(TString(CommitName), "world", "sink") : AstNode("world")) + ))); + + return TAstListNode::DoInit(ctx, src); + } + + TPtr DoClone() const final { + return {}; + } + +private: + const TString Id; + const TString Mode; + +}; // TAsyncReplication + +class TCreateAsyncReplication final: public TAsyncReplication { +public: + explicit TCreateAsyncReplication(TPosition pos, const TString& id, + std::vector<std::pair<TString, TString>>&& targets, + std::map<TString, TNodePtr>&& settings, + const TObjectOperatorContext& context) + : TAsyncReplication(pos, id, "createAsyncReplication", context) + , Targets(std::move(targets)) + , Settings(std::move(settings)) + { + } + +protected: + INode::TPtr FillOptions(INode::TPtr options) const override { + if (!Targets.empty()) { + auto targets = Y(); + for (auto&& [remote, local] : Targets) { + auto target = Y(); + target = L(target, Q(Y(Q("remote"), Q(remote)))); + target = L(target, Q(Y(Q("local"), Q(local)))); + targets = L(targets, Q(target)); + } + options = L(options, Q(Y(Q("targets"), Q(targets)))); + } + + if (!Settings.empty()) { + auto settings = Y(); + for (auto&& [k, v] : Settings) { + if (v) { + settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k), v))); + } else { + settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k)))); + } + } + options = L(options, Q(Y(Q("settings"), Q(settings)))); + } + + return options; + } + +private: + std::vector<std::pair<TString, TString>> Targets; // (remote, local) + std::map<TString, TNodePtr> Settings; + +}; // TCreateAsyncReplication + +TNodePtr BuildCreateAsyncReplication(TPosition pos, const TString& id, + std::vector<std::pair<TString, TString>>&& targets, + std::map<TString, TNodePtr>&& settings, + const TObjectOperatorContext& context) +{ + return new TCreateAsyncReplication(pos, id, std::move(targets), std::move(settings), context); +} + +class TDropAsyncReplication final: public TAsyncReplication { +public: + explicit TDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context) + : TAsyncReplication(pos, id, "dropAsyncReplication", context) + , Cascade(cascade) + { + } + +protected: + INode::TPtr FillOptions(INode::TPtr options) const override { + if (Cascade) { + options = L(options, Q(Y(Q("cascade")))); + } + + return options; + } + +private: + const bool Cascade; + +}; // TDropAsyncReplication + +TNodePtr BuildDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context) { + return new TDropAsyncReplication(pos, id, cascade, context); +} + static const TMap<EWriteColumnMode, TString> columnModeToStrMapMR { {EWriteColumnMode::Default, ""}, {EWriteColumnMode::Insert, "append"}, diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp index 8fd084af80..4b3d272051 100644 --- a/ydb/library/yql/sql/v1/sql.cpp +++ b/ydb/library/yql/sql/v1/sql.cpp @@ -8685,6 +8685,34 @@ void TSqlQuery::AddStatementToBlocks(TVector<TNodePtr>& blocks, TNodePtr node) { blocks.emplace_back(node); } +static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out, const TRule_replication_settings_entry& in, TTranslation& ctx) { + auto key = Id(in.GetRule_an_id1(), ctx); + auto value = BuildLiteralSmartString(ctx.Context(), ctx.Token(in.GetToken3())); + // TODO(ilnaz): validate + out.emplace(std::move(key), value); + return true; +} + +static bool AsyncReplicationSettings(std::map<TString, TNodePtr>& out, const TRule_replication_settings& in, TTranslation& ctx) { + if (!AsyncReplicationSettingsEntry(out, in.GetRule_replication_settings_entry1(), ctx)) { + return false; + } + + for (auto& block : in.GetBlock2()) { + if (!AsyncReplicationSettingsEntry(out, block.GetRule_replication_settings_entry2(), ctx)) { + return false; + } + } + + return true; +} + +static bool AsyncReplicationTarget(std::vector<std::pair<TString, TString>>& out, const TRule_replication_target& in, TTranslation& ctx) { + const TString remote = Id(in.GetRule_object_ref1().GetRule_id_or_at2(), ctx).second; + const TString local = Id(in.GetRule_object_ref3().GetRule_id_or_at2(), ctx).second; + out.emplace_back(remote, local); + return true; +} bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core) { TString internalStatementName; @@ -9152,8 +9180,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& AddStatementToBlocks(blocks, stmt); break; } - case TRule_sql_stmt_core::kAltSqlStmtCore25: - { + case TRule_sql_stmt_core::kAltSqlStmtCore25: { // drop_role_stmt: DROP (USER|GROUP) (IF EXISTS)? role_name (COMMA role_name)* COMMA?; Ctx.BodyPart(); auto& node = core.GetAlt_sql_stmt_core25().GetRule_drop_role_stmt1(); @@ -9188,8 +9215,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& AddStatementToBlocks(blocks, BuildDropRoles(pos, service, cluster, roles, isUser, force, Ctx.Scoped)); break; } - case TRule_sql_stmt_core::kAltSqlStmtCore26: - { + case TRule_sql_stmt_core::kAltSqlStmtCore26: { // create_object_stmt: CREATE OBJECT name (TYPE type [WITH k=v,...]); auto& node = core.GetAlt_sql_stmt_core26().GetRule_create_object_stmt1(); TObjectOperatorContext context(Ctx.Scoped); @@ -9212,8 +9238,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& AddStatementToBlocks(blocks, BuildCreateObjectOperation(Ctx.Pos(), objectId, typeId, std::move(kv), context)); break; } - case TRule_sql_stmt_core::kAltSqlStmtCore27: - { + case TRule_sql_stmt_core::kAltSqlStmtCore27: { // alter_object_stmt: ALTER OBJECT name (TYPE type [SET k=v,...]); auto& node = core.GetAlt_sql_stmt_core27().GetRule_alter_object_stmt1(); TObjectOperatorContext context(Ctx.Scoped); @@ -9234,8 +9259,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx.Pos(), objectId, typeId, std::move(kv), context)); break; } - case TRule_sql_stmt_core::kAltSqlStmtCore28: - { + case TRule_sql_stmt_core::kAltSqlStmtCore28: { // drop_object_stmt: DROP OBJECT name (TYPE type [WITH k=v,...]); auto& node = core.GetAlt_sql_stmt_core28().GetRule_drop_object_stmt1(); TObjectOperatorContext context(Ctx.Scoped); @@ -9258,8 +9282,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& AddStatementToBlocks(blocks, BuildDropObjectOperation(Ctx.Pos(), objectId, typeId, std::move(kv), context)); break; } - case TRule_sql_stmt_core::kAltSqlStmtCore29: - { + case TRule_sql_stmt_core::kAltSqlStmtCore29: { // create_external_data_source_stmt: CREATE EXTERNAL DATA SOURCE name WITH (k=v,...); auto& node = core.GetAlt_sql_stmt_core29().GetRule_create_external_data_source_stmt1(); TObjectOperatorContext context(Ctx.Scoped); @@ -9279,8 +9302,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& AddStatementToBlocks(blocks, BuildCreateObjectOperation(Ctx.Pos(), BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), objectId), "EXTERNAL_DATA_SOURCE", std::move(kv), context)); break; } - case TRule_sql_stmt_core::kAltSqlStmtCore30: - { + case TRule_sql_stmt_core::kAltSqlStmtCore30: { // drop_external_data_source_stmt: DROP EXTERNAL DATA SOURCE name; auto& node = core.GetAlt_sql_stmt_core30().GetRule_drop_external_data_source_stmt1(); TObjectOperatorContext context(Ctx.Scoped); @@ -9295,6 +9317,51 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& AddStatementToBlocks(blocks, BuildDropObjectOperation(Ctx.Pos(), BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), objectId), "EXTERNAL_DATA_SOURCE", {}, context)); break; } + case TRule_sql_stmt_core::kAltSqlStmtCore31: { + // create_replication_stmt: CREATE ASYNC REPLICATION + auto& node = core.GetAlt_sql_stmt_core31().GetRule_create_replication_stmt1(); + TObjectOperatorContext context(Ctx.Scoped); + if (node.GetRule_object_ref4().HasBlock1()) { + const auto& cluster = node.GetRule_object_ref4().GetBlock1().GetRule_cluster_expr1(); + if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) { + return false; + } + } + + std::vector<std::pair<TString, TString>> targets; + if (!AsyncReplicationTarget(targets, node.GetRule_replication_target6(), *this)) { + return false; + } + for (auto& block : node.GetBlock7()) { + if (!AsyncReplicationTarget(targets, block.GetRule_replication_target2(), *this)) { + return false; + } + } + + std::map<TString, TNodePtr> settings; + if (!AsyncReplicationSettings(settings, node.GetRule_replication_settings10(), *this)) { + return false; + } + + const TString id = Id(node.GetRule_object_ref4().GetRule_id_or_at2(), *this).second; + AddStatementToBlocks(blocks, BuildCreateAsyncReplication(Ctx.Pos(), id, std::move(targets), std::move(settings), context)); + break; + } + case TRule_sql_stmt_core::kAltSqlStmtCore32: { + // drop_replication_stmt: DROP ASYNC REPLICATION + auto& node = core.GetAlt_sql_stmt_core32().GetRule_drop_replication_stmt1(); + TObjectOperatorContext context(Ctx.Scoped); + if (node.GetRule_object_ref4().HasBlock1()) { + const auto& cluster = node.GetRule_object_ref4().GetBlock1().GetRule_cluster_expr1(); + if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) { + return false; + } + } + + const TString id = Id(node.GetRule_object_ref4().GetRule_id_or_at2(), *this).second; + AddStatementToBlocks(blocks, BuildDropAsyncReplication(Ctx.Pos(), id, node.HasBlock5(), context)); + break; + } default: Ctx.IncrementMonCounter("sql_errors", "UnknownStatement" + internalStatementName); AltNotImplemented("sql_stmt_core", core); diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp index 50a336c161..a9bf77cb2b 100644 --- a/ydb/library/yql/sql/v1/sql_ut.cpp +++ b/ydb/library/yql/sql/v1/sql_ut.cpp @@ -269,6 +269,11 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { UNIT_ASSERT(SqlToYql("USE plato; SELECT CHANGEFEED FROM CHANGEFEED").IsOk()); } + Y_UNIT_TEST(ReplicationKeywordNotReservedForNames) { + UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE REPLICATION (REPLICATION Uint32, PRIMARY KEY (REPLICATION));").IsOk()); + UNIT_ASSERT(SqlToYql("USE plato; SELECT REPLICATION FROM REPLICATION").IsOk()); + } + Y_UNIT_TEST(Jubilee) { NYql::TAstParseResult res = SqlToYql("USE plato; INSERT INTO Arcadia (r2000000) VALUES (\"2M GET!!!\");"); UNIT_ASSERT(res.Root); @@ -4834,4 +4839,80 @@ Y_UNIT_TEST_SUITE(ExternalDeclares) { UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); } + + Y_UNIT_TEST(CreateAsyncReplicationParseCorrect) { + auto req = R"( + USE plato; + CREATE ASYNC REPLICATION MyReplication + FOR table1 AS table2, table3 AS table4 + WITH ( + ENDPOINT = "localhost:2135", + DATABASE = "/MyDatabase" + ); + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(res.Root); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createAsyncReplication")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table1")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table2")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table3")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table4")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("ENDPOINT")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("localhost:2135")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("DATABASE")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("/MyDatabase")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0}}; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } + + Y_UNIT_TEST(DropAsyncReplicationParseCorrect) { + auto req = R"( + USE plato; + DROP ASYNC REPLICATION MyReplication; + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(res.Root); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropAsyncReplication")); + UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("cascade")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0}}; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } + + Y_UNIT_TEST(DropAsyncReplicationCascade) { + auto req = R"( + USE plato; + DROP ASYNC REPLICATION MyReplication CASCADE; + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(res.Root); + + TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) { + if (word == "Write") { + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("cascade")); + } + }; + + TWordCountHive elementStat = { {TString("Write"), 0}}; + VerifyProgram(res, elementStat, verifyLine); + + UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]); + } } |