aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-03-09 11:14:14 +0300
committerilnaz <ilnaz@ydb.tech>2023-03-09 11:14:14 +0300
commitf9027a68c7299e928dd71fadad4556f89650e3aa (patch)
tree226d179e7c051245c3e48fb3001b2bbc73a121bc
parent984d827f9e03fdc9c473e5f26b8397155ae4fc87 (diff)
downloadydb-f9027a68c7299e928dd71fadad4556f89650e3aa.tar.gz
CREATE/DROP ASYNC REPLICATION syntax
-rw-r--r--ydb/library/yql/sql/v1/SQLv1.g.in16
-rw-r--r--ydb/library/yql/sql/v1/format/sql_format.cpp35
-rw-r--r--ydb/library/yql/sql/v1/format/sql_format_ut.cpp14
-rw-r--r--ydb/library/yql/sql/v1/node.h5
-rw-r--r--ydb/library/yql/sql/v1/query.cpp121
-rw-r--r--ydb/library/yql/sql/v1/sql.cpp91
-rw-r--r--ydb/library/yql/sql/v1/sql_ut.cpp81
7 files changed, 341 insertions, 22 deletions
diff --git a/ydb/library/yql/sql/v1/SQLv1.g.in b/ydb/library/yql/sql/v1/SQLv1.g.in
index 1b2bba95de..927d94da21 100644
--- a/ydb/library/yql/sql/v1/SQLv1.g.in
+++ b/ydb/library/yql/sql/v1/SQLv1.g.in
@@ -49,6 +49,8 @@ sql_stmt_core:
| drop_object_stmt
| create_external_data_source_stmt
| drop_external_data_source_stmt
+ | create_replication_stmt
+ | drop_replication_stmt
;
expr:
@@ -608,6 +610,17 @@ drop_role_stmt: DROP (USER|GROUP) (IF EXISTS)? role_name (COMMA role_name)* COMM
role_name: an_id_or_type | bind_parameter;
create_user_option: ENCRYPTED? PASSWORD expr;
+create_replication_stmt: CREATE ASYNC REPLICATION object_ref
+ FOR replication_target (COMMA replication_target)*
+ WITH LPAREN replication_settings RPAREN
+;
+
+replication_target: object_ref AS object_ref;
+replication_settings: replication_settings_entry (COMMA replication_settings_entry)*;
+replication_settings_entry: an_id EQUALS STRING_VALUE;
+
+drop_replication_stmt: DROP ASYNC REPLICATION object_ref CASCADE?;
+
action_or_subquery_args: opt_bind_parameter (COMMA opt_bind_parameter)*;
define_action_or_subquery_stmt: DEFINE (ACTION|SUBQUERY) bind_parameter LPAREN action_or_subquery_args? RPAREN AS define_action_or_subquery_body END DEFINE;
@@ -896,6 +909,7 @@ keyword_hint_uncompat:
keyword_as_compat:
DATA
+ | REPLICATION
| SOURCE
| TYPE
;
@@ -1023,6 +1037,7 @@ keyword_compat: (
| RELEASE
| RENAME
| REPLACE
+ | REPLICATION
| RESET
| RESPECT
| RESTRICT
@@ -1332,6 +1347,7 @@ RELEASE: R E L E A S E;
RENAME: R E N A M E;
REPEATABLE: R E P E A T A B L E;
REPLACE: R E P L A C E;
+REPLICATION: R E P L I C A T I O N;
RESET: R E S E T;
RESOURCE: R E S O U R C E;
RESPECT: R E S P E C T;
diff --git a/ydb/library/yql/sql/v1/format/sql_format.cpp b/ydb/library/yql/sql/v1/format/sql_format.cpp
index 96227ff72b..501f4beb5e 100644
--- a/ydb/library/yql/sql/v1/format/sql_format.cpp
+++ b/ydb/library/yql/sql/v1/format/sql_format.cpp
@@ -1077,17 +1077,30 @@ private:
VisitAllFields(TRule_drop_object_stmt::GetDescriptor(), msg);
}
- void VisitCreateExternalDataSource(const TRule_create_external_data_source_stmt& msg) {
- PosFromToken(msg.GetToken1());
- NewLine();
- VisitAllFields(TRule_create_external_data_source_stmt::GetDescriptor(), msg);
- }
-
+ void VisitCreateExternalDataSource(const TRule_create_external_data_source_stmt& msg) {
+ PosFromToken(msg.GetToken1());
+ NewLine();
+ VisitAllFields(TRule_create_external_data_source_stmt::GetDescriptor(), msg);
+ }
+
void VisitDropExternalDataSource(const TRule_drop_external_data_source_stmt& msg) {
- MarkAsSimple();
- PosFromToken(msg.GetToken1());
- NewLine();
- VisitAllFields(TRule_drop_external_data_source_stmt::GetDescriptor(), msg);
+ MarkAsSimple();
+ PosFromToken(msg.GetToken1());
+ NewLine();
+ VisitAllFields(TRule_drop_external_data_source_stmt::GetDescriptor(), msg);
+ }
+
+ void VisitCreateAsyncReplication(const TRule_create_replication_stmt& msg) {
+ PosFromToken(msg.GetToken1());
+ NewLine();
+ VisitAllFields(TRule_create_replication_stmt::GetDescriptor(), msg);
+ }
+
+ void VisitDropAsyncReplication(const TRule_drop_replication_stmt& msg) {
+ MarkAsSimple();
+ PosFromToken(msg.GetToken1());
+ NewLine();
+ VisitAllFields(TRule_drop_replication_stmt::GetDescriptor(), msg);
}
template <void (TVisitor::*Func)(const NProtoBuf::Message&)>
@@ -1990,6 +2003,8 @@ TStaticData::TStaticData()
{TRule_drop_object_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitDropObject)},
{TRule_create_external_data_source_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitCreateExternalDataSource)},
{TRule_drop_external_data_source_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitDropExternalDataSource)},
+ {TRule_create_replication_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitCreateAsyncReplication)},
+ {TRule_drop_replication_stmt::GetDescriptor(), MakeFunctor(&TVisitor::VisitDropAsyncReplication)},
})
{
// ensure that all statements has a visitor
diff --git a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
index 6b6befeb09..3122409979 100644
--- a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
+++ b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
@@ -285,6 +285,20 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
setup.Run(cases);
}
+ Y_UNIT_TEST(AsyncReplication) {
+ TCases cases = {
+ {"create async replication user for table1 AS table2 with (user='foo')",
+ "CREATE ASYNC REPLICATION user FOR table1 AS table2 WITH (user = 'foo');\n\n"},
+ {"drop async replication user",
+ "DROP ASYNC REPLICATION user;\n"},
+ {"drop async replication user cascade",
+ "DROP ASYNC REPLICATION user CASCADE;\n"},
+ };
+
+ TSetup setup;
+ setup.Run(cases);
+ }
+
Y_UNIT_TEST(TypeSelection) {
TCases cases = {
{"Select tYpe.* frOm Table tYpe",
diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h
index ec613cd5cc..3cd9dd6c7e 100644
--- a/ydb/library/yql/sql/v1/node.h
+++ b/ydb/library/yql/sql/v1/node.h
@@ -1412,6 +1412,11 @@ namespace NSQLTranslationV1 {
std::map<TString, TDeferredAtom>&& features, const TObjectOperatorContext& context);
TNodePtr BuildDropObjectOperation(TPosition pos, const TString& secretId, const TString& typeId,
std::map<TString, TDeferredAtom>&& options, const TObjectOperatorContext& context);
+ TNodePtr BuildCreateAsyncReplication(TPosition pos, const TString& id,
+ std::vector<std::pair<TString, TString>>&& targets,
+ std::map<TString, TNodePtr>&& settings,
+ const TObjectOperatorContext& context);
+ TNodePtr BuildDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context);
TNodePtr BuildWriteTable(TPosition pos, const TString& label, const TTableRef& table, EWriteColumnMode mode, TNodePtr options,
TScopedStatePtr scoped);
TNodePtr BuildWriteResult(TPosition pos, const TString& label, TNodePtr settings);
diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp
index beb67703fb..737aed2c6a 100644
--- a/ydb/library/yql/sql/v1/query.cpp
+++ b/ydb/library/yql/sql/v1/query.cpp
@@ -1532,6 +1532,127 @@ TNodePtr BuildDropRoles(TPosition pos, const TString& service, const TDeferredAt
return new TDropRoles(pos, service, cluster, toDrop, isUser, force, scoped);
}
+class TAsyncReplication
+ : public TAstListNode
+ , protected TObjectOperatorContext
+{
+protected:
+ virtual INode::TPtr FillOptions(INode::TPtr options) const = 0;
+
+public:
+ explicit TAsyncReplication(TPosition pos, const TString& id, const TString& mode, const TObjectOperatorContext& context)
+ : TAstListNode(pos)
+ , TObjectOperatorContext(context)
+ , Id(id)
+ , Mode(mode)
+ {
+ }
+
+ bool DoInit(TContext& ctx, ISource* src) override {
+ Scoped->UseCluster(ServiceId, Cluster);
+
+ auto keys = Y("Key", Q(Y(Q("id"), Y("String", BuildQuotedAtom(Pos, Id)))));
+ auto options = FillOptions(Y(Q(Y(Q("mode"), Q(Mode)))));
+
+ Add("block", Q(Y(
+ Y("let", "sink", Y("DataSink", BuildQuotedAtom(Pos, ServiceId), Scoped->WrapCluster(Cluster, ctx))),
+ Y("let", "world", Y(TString(WriteName), "world", "sink", keys, Y("Void"), Q(options))),
+ Y("return", ctx.PragmaAutoCommit ? Y(TString(CommitName), "world", "sink") : AstNode("world"))
+ )));
+
+ return TAstListNode::DoInit(ctx, src);
+ }
+
+ TPtr DoClone() const final {
+ return {};
+ }
+
+private:
+ const TString Id;
+ const TString Mode;
+
+}; // TAsyncReplication
+
+class TCreateAsyncReplication final: public TAsyncReplication {
+public:
+ explicit TCreateAsyncReplication(TPosition pos, const TString& id,
+ std::vector<std::pair<TString, TString>>&& targets,
+ std::map<TString, TNodePtr>&& settings,
+ const TObjectOperatorContext& context)
+ : TAsyncReplication(pos, id, "createAsyncReplication", context)
+ , Targets(std::move(targets))
+ , Settings(std::move(settings))
+ {
+ }
+
+protected:
+ INode::TPtr FillOptions(INode::TPtr options) const override {
+ if (!Targets.empty()) {
+ auto targets = Y();
+ for (auto&& [remote, local] : Targets) {
+ auto target = Y();
+ target = L(target, Q(Y(Q("remote"), Q(remote))));
+ target = L(target, Q(Y(Q("local"), Q(local))));
+ targets = L(targets, Q(target));
+ }
+ options = L(options, Q(Y(Q("targets"), Q(targets))));
+ }
+
+ if (!Settings.empty()) {
+ auto settings = Y();
+ for (auto&& [k, v] : Settings) {
+ if (v) {
+ settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k), v)));
+ } else {
+ settings = L(settings, Q(Y(BuildQuotedAtom(Pos, k))));
+ }
+ }
+ options = L(options, Q(Y(Q("settings"), Q(settings))));
+ }
+
+ return options;
+ }
+
+private:
+ std::vector<std::pair<TString, TString>> Targets; // (remote, local)
+ std::map<TString, TNodePtr> Settings;
+
+}; // TCreateAsyncReplication
+
+TNodePtr BuildCreateAsyncReplication(TPosition pos, const TString& id,
+ std::vector<std::pair<TString, TString>>&& targets,
+ std::map<TString, TNodePtr>&& settings,
+ const TObjectOperatorContext& context)
+{
+ return new TCreateAsyncReplication(pos, id, std::move(targets), std::move(settings), context);
+}
+
+class TDropAsyncReplication final: public TAsyncReplication {
+public:
+ explicit TDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context)
+ : TAsyncReplication(pos, id, "dropAsyncReplication", context)
+ , Cascade(cascade)
+ {
+ }
+
+protected:
+ INode::TPtr FillOptions(INode::TPtr options) const override {
+ if (Cascade) {
+ options = L(options, Q(Y(Q("cascade"))));
+ }
+
+ return options;
+ }
+
+private:
+ const bool Cascade;
+
+}; // TDropAsyncReplication
+
+TNodePtr BuildDropAsyncReplication(TPosition pos, const TString& id, bool cascade, const TObjectOperatorContext& context) {
+ return new TDropAsyncReplication(pos, id, cascade, context);
+}
+
static const TMap<EWriteColumnMode, TString> columnModeToStrMapMR {
{EWriteColumnMode::Default, ""},
{EWriteColumnMode::Insert, "append"},
diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp
index 8fd084af80..4b3d272051 100644
--- a/ydb/library/yql/sql/v1/sql.cpp
+++ b/ydb/library/yql/sql/v1/sql.cpp
@@ -8685,6 +8685,34 @@ void TSqlQuery::AddStatementToBlocks(TVector<TNodePtr>& blocks, TNodePtr node) {
blocks.emplace_back(node);
}
+static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out, const TRule_replication_settings_entry& in, TTranslation& ctx) {
+ auto key = Id(in.GetRule_an_id1(), ctx);
+ auto value = BuildLiteralSmartString(ctx.Context(), ctx.Token(in.GetToken3()));
+ // TODO(ilnaz): validate
+ out.emplace(std::move(key), value);
+ return true;
+}
+
+static bool AsyncReplicationSettings(std::map<TString, TNodePtr>& out, const TRule_replication_settings& in, TTranslation& ctx) {
+ if (!AsyncReplicationSettingsEntry(out, in.GetRule_replication_settings_entry1(), ctx)) {
+ return false;
+ }
+
+ for (auto& block : in.GetBlock2()) {
+ if (!AsyncReplicationSettingsEntry(out, block.GetRule_replication_settings_entry2(), ctx)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+static bool AsyncReplicationTarget(std::vector<std::pair<TString, TString>>& out, const TRule_replication_target& in, TTranslation& ctx) {
+ const TString remote = Id(in.GetRule_object_ref1().GetRule_id_or_at2(), ctx).second;
+ const TString local = Id(in.GetRule_object_ref3().GetRule_id_or_at2(), ctx).second;
+ out.emplace_back(remote, local);
+ return true;
+}
bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core& core) {
TString internalStatementName;
@@ -9152,8 +9180,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
AddStatementToBlocks(blocks, stmt);
break;
}
- case TRule_sql_stmt_core::kAltSqlStmtCore25:
- {
+ case TRule_sql_stmt_core::kAltSqlStmtCore25: {
// drop_role_stmt: DROP (USER|GROUP) (IF EXISTS)? role_name (COMMA role_name)* COMMA?;
Ctx.BodyPart();
auto& node = core.GetAlt_sql_stmt_core25().GetRule_drop_role_stmt1();
@@ -9188,8 +9215,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
AddStatementToBlocks(blocks, BuildDropRoles(pos, service, cluster, roles, isUser, force, Ctx.Scoped));
break;
}
- case TRule_sql_stmt_core::kAltSqlStmtCore26:
- {
+ case TRule_sql_stmt_core::kAltSqlStmtCore26: {
// create_object_stmt: CREATE OBJECT name (TYPE type [WITH k=v,...]);
auto& node = core.GetAlt_sql_stmt_core26().GetRule_create_object_stmt1();
TObjectOperatorContext context(Ctx.Scoped);
@@ -9212,8 +9238,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
AddStatementToBlocks(blocks, BuildCreateObjectOperation(Ctx.Pos(), objectId, typeId, std::move(kv), context));
break;
}
- case TRule_sql_stmt_core::kAltSqlStmtCore27:
- {
+ case TRule_sql_stmt_core::kAltSqlStmtCore27: {
// alter_object_stmt: ALTER OBJECT name (TYPE type [SET k=v,...]);
auto& node = core.GetAlt_sql_stmt_core27().GetRule_alter_object_stmt1();
TObjectOperatorContext context(Ctx.Scoped);
@@ -9234,8 +9259,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
AddStatementToBlocks(blocks, BuildAlterObjectOperation(Ctx.Pos(), objectId, typeId, std::move(kv), context));
break;
}
- case TRule_sql_stmt_core::kAltSqlStmtCore28:
- {
+ case TRule_sql_stmt_core::kAltSqlStmtCore28: {
// drop_object_stmt: DROP OBJECT name (TYPE type [WITH k=v,...]);
auto& node = core.GetAlt_sql_stmt_core28().GetRule_drop_object_stmt1();
TObjectOperatorContext context(Ctx.Scoped);
@@ -9258,8 +9282,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
AddStatementToBlocks(blocks, BuildDropObjectOperation(Ctx.Pos(), objectId, typeId, std::move(kv), context));
break;
}
- case TRule_sql_stmt_core::kAltSqlStmtCore29:
- {
+ case TRule_sql_stmt_core::kAltSqlStmtCore29: {
// create_external_data_source_stmt: CREATE EXTERNAL DATA SOURCE name WITH (k=v,...);
auto& node = core.GetAlt_sql_stmt_core29().GetRule_create_external_data_source_stmt1();
TObjectOperatorContext context(Ctx.Scoped);
@@ -9279,8 +9302,7 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
AddStatementToBlocks(blocks, BuildCreateObjectOperation(Ctx.Pos(), BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), objectId), "EXTERNAL_DATA_SOURCE", std::move(kv), context));
break;
}
- case TRule_sql_stmt_core::kAltSqlStmtCore30:
- {
+ case TRule_sql_stmt_core::kAltSqlStmtCore30: {
// drop_external_data_source_stmt: DROP EXTERNAL DATA SOURCE name;
auto& node = core.GetAlt_sql_stmt_core30().GetRule_drop_external_data_source_stmt1();
TObjectOperatorContext context(Ctx.Scoped);
@@ -9295,6 +9317,51 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
AddStatementToBlocks(blocks, BuildDropObjectOperation(Ctx.Pos(), BuildTablePath(Ctx.GetPrefixPath(context.ServiceId, context.Cluster), objectId), "EXTERNAL_DATA_SOURCE", {}, context));
break;
}
+ case TRule_sql_stmt_core::kAltSqlStmtCore31: {
+ // create_replication_stmt: CREATE ASYNC REPLICATION
+ auto& node = core.GetAlt_sql_stmt_core31().GetRule_create_replication_stmt1();
+ TObjectOperatorContext context(Ctx.Scoped);
+ if (node.GetRule_object_ref4().HasBlock1()) {
+ const auto& cluster = node.GetRule_object_ref4().GetBlock1().GetRule_cluster_expr1();
+ if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) {
+ return false;
+ }
+ }
+
+ std::vector<std::pair<TString, TString>> targets;
+ if (!AsyncReplicationTarget(targets, node.GetRule_replication_target6(), *this)) {
+ return false;
+ }
+ for (auto& block : node.GetBlock7()) {
+ if (!AsyncReplicationTarget(targets, block.GetRule_replication_target2(), *this)) {
+ return false;
+ }
+ }
+
+ std::map<TString, TNodePtr> settings;
+ if (!AsyncReplicationSettings(settings, node.GetRule_replication_settings10(), *this)) {
+ return false;
+ }
+
+ const TString id = Id(node.GetRule_object_ref4().GetRule_id_or_at2(), *this).second;
+ AddStatementToBlocks(blocks, BuildCreateAsyncReplication(Ctx.Pos(), id, std::move(targets), std::move(settings), context));
+ break;
+ }
+ case TRule_sql_stmt_core::kAltSqlStmtCore32: {
+ // drop_replication_stmt: DROP ASYNC REPLICATION
+ auto& node = core.GetAlt_sql_stmt_core32().GetRule_drop_replication_stmt1();
+ TObjectOperatorContext context(Ctx.Scoped);
+ if (node.GetRule_object_ref4().HasBlock1()) {
+ const auto& cluster = node.GetRule_object_ref4().GetBlock1().GetRule_cluster_expr1();
+ if (!ClusterExpr(cluster, false, context.ServiceId, context.Cluster)) {
+ return false;
+ }
+ }
+
+ const TString id = Id(node.GetRule_object_ref4().GetRule_id_or_at2(), *this).second;
+ AddStatementToBlocks(blocks, BuildDropAsyncReplication(Ctx.Pos(), id, node.HasBlock5(), context));
+ break;
+ }
default:
Ctx.IncrementMonCounter("sql_errors", "UnknownStatement" + internalStatementName);
AltNotImplemented("sql_stmt_core", core);
diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp
index 50a336c161..a9bf77cb2b 100644
--- a/ydb/library/yql/sql/v1/sql_ut.cpp
+++ b/ydb/library/yql/sql/v1/sql_ut.cpp
@@ -269,6 +269,11 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT(SqlToYql("USE plato; SELECT CHANGEFEED FROM CHANGEFEED").IsOk());
}
+ Y_UNIT_TEST(ReplicationKeywordNotReservedForNames) {
+ UNIT_ASSERT(SqlToYql("USE plato; CREATE TABLE REPLICATION (REPLICATION Uint32, PRIMARY KEY (REPLICATION));").IsOk());
+ UNIT_ASSERT(SqlToYql("USE plato; SELECT REPLICATION FROM REPLICATION").IsOk());
+ }
+
Y_UNIT_TEST(Jubilee) {
NYql::TAstParseResult res = SqlToYql("USE plato; INSERT INTO Arcadia (r2000000) VALUES (\"2M GET!!!\");");
UNIT_ASSERT(res.Root);
@@ -4834,4 +4839,80 @@ Y_UNIT_TEST_SUITE(ExternalDeclares) {
UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
}
+
+ Y_UNIT_TEST(CreateAsyncReplicationParseCorrect) {
+ auto req = R"(
+ USE plato;
+ CREATE ASYNC REPLICATION MyReplication
+ FOR table1 AS table2, table3 AS table4
+ WITH (
+ ENDPOINT = "localhost:2135",
+ DATABASE = "/MyDatabase"
+ );
+ )";
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(res.Root);
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "Write") {
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("createAsyncReplication"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table1"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table2"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table3"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("table4"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("ENDPOINT"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("localhost:2135"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("DATABASE"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("/MyDatabase"));
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("Write"), 0}};
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
+ }
+
+ Y_UNIT_TEST(DropAsyncReplicationParseCorrect) {
+ auto req = R"(
+ USE plato;
+ DROP ASYNC REPLICATION MyReplication;
+ )";
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(res.Root);
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "Write") {
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("MyReplication"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("dropAsyncReplication"));
+ UNIT_ASSERT_VALUES_EQUAL(TString::npos, line.find("cascade"));
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("Write"), 0}};
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
+ }
+
+ Y_UNIT_TEST(DropAsyncReplicationCascade) {
+ auto req = R"(
+ USE plato;
+ DROP ASYNC REPLICATION MyReplication CASCADE;
+ )";
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(res.Root);
+
+ TVerifyLineFunc verifyLine = [](const TString& word, const TString& line) {
+ if (word == "Write") {
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("cascade"));
+ }
+ };
+
+ TWordCountHive elementStat = { {TString("Write"), 0}};
+ VerifyProgram(res, elementStat, verifyLine);
+
+ UNIT_ASSERT_VALUES_EQUAL(1, elementStat["Write"]);
+ }
}