aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-04-22 19:09:52 +0300
committerGitHub <noreply@github.com>2024-04-22 19:09:52 +0300
commit111c0526369cf055135cfc885ab64cc9f9e2bd76 (patch)
treee1c374abd5327c88ebdb64d76351cbe241b6e142
parentc05d352c297779ce0993d3675c1e84dc704f4a5e (diff)
downloadydb-111c0526369cf055135cfc885ab64cc9f9e2bd76.tar.gz
{CREATE,ALTER,DROP} ASYNC REPLICATION impl (#3904)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp18
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp12
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp168
-rw-r--r--ydb/core/kqp/host/ya.make1
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasink.cpp84
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasource.cpp2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp130
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_expr_nodes.json36
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h64
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.cpp14
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h5
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider_impl.h13
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_type_ann.cpp64
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp252
-rw-r--r--ydb/core/protos/kqp_physical.proto3
-rw-r--r--ydb/library/yql/core/expr_nodes/yql_expr_nodes.json13
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.cpp64
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.h12
18 files changed, 952 insertions, 3 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
index 89216e58d3..11db54b315 100644
--- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
@@ -247,6 +247,24 @@ public:
break;
}
+ case NKqpProto::TKqpSchemeOperation::kCreateReplication: {
+ const auto& modifyScheme = schemeOp.GetCreateReplication();
+ ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
+ break;
+ }
+
+ case NKqpProto::TKqpSchemeOperation::kAlterReplication: {
+ const auto& modifyScheme = schemeOp.GetAlterReplication();
+ ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
+ break;
+ }
+
+ case NKqpProto::TKqpSchemeOperation::kDropReplication: {
+ const auto& modifyScheme = schemeOp.GetDropReplication();
+ ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
+ break;
+ }
+
default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
<< (ui32) schemeOp.GetOperationCase());
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 1feaa2de98..d2b8e66065 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -1011,6 +1011,18 @@ public:
}
}
+ TFuture<TGenericResult> CreateReplication(const TString&, const NYql::TCreateReplicationSettings&) override {
+ return NotImplemented<TGenericResult>();
+ }
+
+ TFuture<TGenericResult> AlterReplication(const TString&, const NYql::TAlterReplicationSettings&) override {
+ return NotImplemented<TGenericResult>();
+ }
+
+ TFuture<TGenericResult> DropReplication(const TString&, const NYql::TDropReplicationSettings&) override {
+ return NotImplemented<TGenericResult>();
+ }
+
TFuture<TGenericResult> AlterColumnTable(const TString& cluster,
const NYql::TAlterColumnTableSettings& settings) override {
Y_UNUSED(cluster);
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index 366464d3ed..0b9f4ade14 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -2,9 +2,11 @@
#include <ydb/core/grpc_services/table_settings.h>
#include <ydb/core/kqp/gateway/utils/scheme_helpers.h>
+#include <ydb/core/protos/replication.pb.h>
#include <ydb/core/ydb_convert/table_description.h>
#include <ydb/core/ydb_convert/column_families.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/common/parser.h>
#include <ydb/services/metadata/abstract/kqp_common.h>
namespace NKikimr::NKqp {
@@ -1725,6 +1727,172 @@ public:
}
}
+ static TString AdjustPath(const TString& path, const TString& database) {
+ if (path.StartsWith('/')) {
+ if (!path.StartsWith(database)) {
+ throw yexception() << "Path '" << path << "' not in database '" << database << "'";
+ }
+ return path;
+ } else {
+ return database + '/' + path;
+ }
+ }
+
+ TFuture<TGenericResult> CreateReplication(const TString& cluster, const NYql::TCreateReplicationSettings& settings) override {
+ CHECK_PREPARED_DDL(CreateReplication);
+
+ try {
+ if (cluster != SessionCtx->GetCluster()) {
+ return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
+ }
+
+ TString error;
+ std::pair<TString, TString> pathPair;
+ if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, true)) {
+ return MakeFuture(ResultFromError<TGenericResult>(error));
+ }
+
+ NKikimrSchemeOp::TModifyScheme tx;
+ tx.SetWorkingDir(pathPair.first);
+ tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateReplication);
+
+ auto& op = *tx.MutableReplication();
+ op.SetName(pathPair.second);
+
+ auto& config = *op.MutableConfig();
+ auto& params = *config.MutableSrcConnectionParams();
+ if (const auto& connectionString = settings.Settings.ConnectionString) {
+ const auto parseResult = NYdb::ParseConnectionString(*connectionString);
+ params.SetEndpoint(parseResult.Endpoint);
+ params.SetDatabase(parseResult.Database);
+ }
+ if (const auto& endpoint = settings.Settings.Endpoint) {
+ params.SetEndpoint(*endpoint);
+ }
+ if (const auto& database = settings.Settings.Database) {
+ params.SetDatabase(*database);
+ }
+ if (const auto& token = settings.Settings.OAuthToken) {
+ params.SetOAuthToken(*token);
+ }
+ if (const auto& creds = settings.Settings.StaticCredentials) {
+ params.MutableStaticCredentials()->SetUser(creds->UserName);
+ params.MutableStaticCredentials()->SetPassword(creds->Password);
+ }
+
+ auto& targets = *config.MutableSpecific();
+ for (const auto& [src, dst] : settings.Targets) {
+ auto& target = *targets.AddTargets();
+ target.SetSrcPath(AdjustPath(src, params.GetDatabase()));
+ target.SetDstPath(AdjustPath(dst, GetDatabase()));
+ }
+
+ if (IsPrepare()) {
+ auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
+ auto& phyTx = *phyQuery.AddTransactions();
+ phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+ phyTx.MutableSchemeOperation()->MutableCreateReplication()->Swap(&tx);
+
+ TGenericResult result;
+ result.SetSuccess();
+ return MakeFuture(result);
+ } else {
+ return Gateway->ModifyScheme(std::move(tx));
+ }
+ }
+ catch (yexception& e) {
+ return MakeFuture(ResultFromException<TGenericResult>(e));
+ }
+ }
+
+ TFuture<TGenericResult> AlterReplication(const TString& cluster, const NYql::TAlterReplicationSettings& settings) override {
+ CHECK_PREPARED_DDL(AlterReplication);
+
+ try {
+ if (cluster != SessionCtx->GetCluster()) {
+ return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
+ }
+
+ std::pair<TString, TString> pathPair;
+ {
+ TString error;
+ if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, false)) {
+ return MakeFuture(ResultFromError<TGenericResult>(error));
+ }
+ }
+
+ NKikimrSchemeOp::TModifyScheme tx;
+ tx.SetWorkingDir(pathPair.first);
+ tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterReplication);
+
+ auto& op = *tx.MutableAlterReplication();
+ op.SetName(pathPair.second);
+
+ auto& state = *op.MutableState();
+ if (const auto& done = settings.Settings.StateDone) {
+ state.MutableDone()->SetFailoverMode(
+ static_cast<NKikimrReplication::TReplicationState::TDone::EFailoverMode>(done->FailoverMode));
+ }
+
+ if (IsPrepare()) {
+ auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
+ auto& phyTx = *phyQuery.AddTransactions();
+ phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+ phyTx.MutableSchemeOperation()->MutableAlterReplication()->Swap(&tx);
+
+ TGenericResult result;
+ result.SetSuccess();
+ return MakeFuture(result);
+ } else {
+ return Gateway->ModifyScheme(std::move(tx));
+ }
+ }
+ catch (yexception& e) {
+ return MakeFuture(ResultFromException<TGenericResult>(e));
+ }
+ }
+
+ TFuture<TGenericResult> DropReplication(const TString& cluster, const NYql::TDropReplicationSettings& settings) override {
+ CHECK_PREPARED_DDL(DropReplication);
+
+ try {
+ if (cluster != SessionCtx->GetCluster()) {
+ return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
+ }
+
+ std::pair<TString, TString> pathPair;
+ {
+ TString error;
+ if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, false)) {
+ return MakeFuture(ResultFromError<TGenericResult>(error));
+ }
+ }
+
+ NKikimrSchemeOp::TModifyScheme tx;
+ tx.SetWorkingDir(pathPair.first);
+ tx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropReplication); // TODO(ilnaz): CASCADE
+
+ auto& op = *tx.MutableDrop();
+ op.SetName(pathPair.second);
+
+ if (IsPrepare()) {
+ auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
+ auto& phyTx = *phyQuery.AddTransactions();
+ phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+ phyTx.MutableSchemeOperation()->MutableDropReplication()->Swap(&tx);
+
+ TGenericResult result;
+ result.SetSuccess();
+ return MakeFuture(result);
+ } else {
+ return Gateway->ModifyScheme(std::move(tx));
+ }
+ }
+ catch (yexception& e) {
+ return MakeFuture(ResultFromException<TGenericResult>(e));
+ }
+ }
+
TVector<NKikimrKqp::TKqpTableMetadataProto> GetCollectedSchemeData() override {
return Gateway->GetCollectedSchemeData();
}
diff --git a/ydb/core/kqp/host/ya.make b/ydb/core/kqp/host/ya.make
index 66a73f2f2a..8b9980f176 100644
--- a/ydb/core/kqp/host/ya.make
+++ b/ydb/core/kqp/host/ya.make
@@ -31,6 +31,7 @@ PEERDIR(
ydb/library/yql/providers/result/provider
ydb/library/yql/providers/s3/provider
ydb/library/yql/providers/pg/provider
+ ydb/public/sdk/cpp/client/impl/ydb_internal/common
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
index bef8363de3..fe67a1607b 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
@@ -97,6 +97,24 @@ private:
return TStatus::Ok;
}
+ TStatus HandleCreateReplication(TKiCreateReplication node, TExprContext& ctx) override {
+ Y_UNUSED(ctx);
+ Y_UNUSED(node);
+ return TStatus::Ok;
+ }
+
+ TStatus HandleAlterReplication(TKiAlterReplication node, TExprContext& ctx) override {
+ Y_UNUSED(ctx);
+ Y_UNUSED(node);
+ return TStatus::Ok;
+ }
+
+ TStatus HandleDropReplication(TKiDropReplication node, TExprContext& ctx) override {
+ Y_UNUSED(ctx);
+ Y_UNUSED(node);
+ return TStatus::Ok;
+ }
+
TStatus HandleCreateSequence(NNodes::TKiCreateSequence node, TExprContext& ctx) override {
Y_UNUSED(ctx);
Y_UNUSED(node);
@@ -314,6 +332,8 @@ private:
return TStatus::Ok;
case TKikimrKey::Type::PGObject:
return TStatus::Ok;
+ case TKikimrKey::Type::Replication:
+ return TStatus::Ok;
}
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Invalid table key type."));
@@ -479,6 +499,7 @@ public:
|| node.IsCallable(TKiAlterTable::CallableName())) {
return true;
}
+
if (node.IsCallable(TKiCreateTopic::CallableName())
|| node.IsCallable(TKiAlterTopic::CallableName())
|| node.IsCallable(TKiDropTopic::CallableName())
@@ -486,6 +507,13 @@ public:
return true;
}
+ if (node.IsCallable(TKiCreateReplication::CallableName())
+ || node.IsCallable(TKiAlterReplication::CallableName())
+ || node.IsCallable(TKiDropReplication::CallableName())
+ ) {
+ return true;
+ }
+
if (node.IsCallable(TKiCreateSequence::CallableName())
|| node.IsCallable(TKiDropSequence::CallableName())) {
return true;
@@ -1185,6 +1213,48 @@ public:
} else {
YQL_ENSURE(false, "unknown PGObject with type: \"" << key.GetPGObjectType() << "\"");
}
+ break;
+ }
+
+ case TKikimrKey::Type::Replication: {
+ auto settings = NCommon::ParseWriteReplicationSettings(TExprList(node->Child(4)), ctx);
+ YQL_ENSURE(settings.Mode);
+ auto mode = settings.Mode.Cast();
+
+ if (mode == "create") {
+ return Build<TKiCreateReplication>(ctx, node->Pos())
+ .World(node->Child(0))
+ .DataSink(node->Child(1))
+ .Replication().Build(key.GetReplicationPath())
+ .Targets(settings.Targets.Cast())
+ .ReplicationSettings(settings.ReplicationSettings.Cast())
+ .Settings(settings.Other)
+ .Done()
+ .Ptr();
+ } else if (mode == "alter") {
+ return Build<TKiAlterReplication>(ctx, node->Pos())
+ .World(node->Child(0))
+ .DataSink(node->Child(1))
+ .Replication().Build(key.GetReplicationPath())
+ .ReplicationSettings(settings.ReplicationSettings.Cast())
+ .Settings(settings.Other)
+ .Done()
+ .Ptr();
+ } else if (mode == "drop" || mode == "dropCascade") {
+ return Build<TKiDropReplication>(ctx, node->Pos())
+ .World(node->Child(0))
+ .DataSink(node->Child(1))
+ .Replication().Build(key.GetReplicationPath())
+ .Cascade<TCoAtom>()
+ .Value(mode == "dropCascade")
+ .Build()
+ .Done()
+ .Ptr();
+ } else {
+ ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "Unknown operation type for replication"));
+ return nullptr;
+ }
+ break;
}
}
@@ -1276,6 +1346,18 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt
return HandleDropTopic(node.Cast(), ctx);
}
+ if (auto node = TMaybeNode<TKiCreateReplication>(input)) {
+ return HandleCreateReplication(node.Cast(), ctx);
+ }
+
+ if (auto node = TMaybeNode<TKiAlterReplication>(input)) {
+ return HandleAlterReplication(node.Cast(), ctx);
+ }
+
+ if (auto node = TMaybeNode<TKiDropReplication>(input)) {
+ return HandleDropReplication(node.Cast(), ctx);
+ }
+
if (auto node = TMaybeNode<TKiUpsertObject>(input)) {
return HandleUpsertObject(node.Cast(), ctx);
}
@@ -1324,7 +1406,7 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt
return HandleDropGroup(node.Cast(), ctx);
}
- if(auto node = TMaybeNode<TPgDropObject>(input)) {
+ if (auto node = TMaybeNode<TPgDropObject>(input)) {
return HandlePgDropObject(node.Cast(), ctx);
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
index 92ef2d6a17..1a1098892a 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
@@ -188,6 +188,8 @@ private:
return TStatus::Ok;
case TKikimrKey::Type::PGObject:
return TStatus::Ok;
+ case TKikimrKey::Type::Replication:
+ return TStatus::Ok;
}
return TStatus::Error;
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index cffbe19319..634cd38a5d 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -1784,6 +1784,136 @@ public:
}, "Executing DROP TOPIC");
}
+ if (auto maybeCreateReplication = TMaybeNode<TKiCreateReplication>(input)) {
+ auto requireStatus = RequireChild(*input, 0);
+ if (requireStatus.Level != TStatus::Ok) {
+ return SyncStatus(requireStatus);
+ }
+
+ auto createReplication = maybeCreateReplication.Cast();
+
+ TCreateReplicationSettings settings;
+ settings.Name = TString(createReplication.Replication());
+
+ for (auto target : createReplication.Targets()) {
+ settings.Targets.emplace_back(
+ target.RemotePath().Cast<TCoAtom>().StringValue(),
+ target.LocalPath().Cast<TCoAtom>().StringValue()
+ );
+ }
+
+ for (auto setting : createReplication.ReplicationSettings()) {
+ auto name = setting.Name().Value();
+ if (name == "connection_string") {
+ settings.Settings.ConnectionString = setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
+ } else if (name == "endpoint") {
+ settings.Settings.Endpoint = setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
+ } else if (name == "database") {
+ settings.Settings.Database = setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
+ } else if (name == "token") {
+ settings.Settings.OAuthToken = setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
+ } else if (name == "user") {
+ settings.Settings.EnsureStaticCredentials().UserName =
+ setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
+ } else if (name == "password") {
+ settings.Settings.EnsureStaticCredentials().Password =
+ setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value();
+ }
+ }
+
+ if (settings.Settings.ConnectionString && (settings.Settings.Endpoint || settings.Settings.Database)) {
+ ctx.AddError(TIssue(ctx.GetPosition(createReplication.Pos()),
+ TStringBuilder() << "Connection string and Endpoint/Database are mutually exclusive"));
+ return SyncError();
+ }
+
+ if (settings.Settings.OAuthToken && settings.Settings.StaticCredentials) {
+ ctx.AddError(TIssue(ctx.GetPosition(createReplication.Pos()),
+ TStringBuilder() << "Token and User/Password are mutually exclusive"));
+ return SyncError();
+ }
+
+ auto cluster = TString(createReplication.DataSink().Cluster());
+ auto future = Gateway->CreateReplication(cluster, settings);
+
+ return WrapFuture(future,
+ [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
+ Y_UNUSED(res);
+ auto resultNode = ctx.NewWorld(input->Pos());
+ return resultNode;
+ }, "Executing CREATE ASYNC REPLICATION");
+ }
+
+ if (auto maybeAlterReplication = TMaybeNode<TKiAlterReplication>(input)) {
+ auto requireStatus = RequireChild(*input, 0);
+ if (requireStatus.Level != TStatus::Ok) {
+ return SyncStatus(requireStatus);
+ }
+
+ auto alterReplication = maybeAlterReplication.Cast();
+
+ TAlterReplicationSettings settings;
+ settings.Name = TString(alterReplication.Replication());
+
+ for (auto setting : alterReplication.ReplicationSettings()) {
+ auto name = setting.Name().Value();
+ if (name == "state") {
+ auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
+ if (to_lower(value) == "done") {
+ settings.Settings.EnsureStateDone();
+ } else {
+ ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
+ TStringBuilder() << "Unknown replication state: " << value));
+ return SyncError();
+ }
+ } else if (name == "failover_mode") {
+ auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
+ if (to_lower(value) == "consistent") {
+ settings.Settings.EnsureStateDone(TReplicationSettings::EFailoverMode::Consistent);
+ } else if (to_lower(value) == "force") {
+ settings.Settings.EnsureStateDone(TReplicationSettings::EFailoverMode::Force);
+ } else {
+ ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
+ TStringBuilder() << "Unknown failover mode: " << value));
+ return SyncError();
+ }
+ }
+ }
+
+ auto cluster = TString(alterReplication.DataSink().Cluster());
+ auto future = Gateway->AlterReplication(cluster, settings);
+
+ return WrapFuture(future,
+ [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
+ Y_UNUSED(res);
+ auto resultNode = ctx.NewWorld(input->Pos());
+ return resultNode;
+ }, "Executing ALTER ASYNC REPLICATION");
+ }
+
+ if (auto maybeDropReplication = TMaybeNode<TKiDropReplication>(input)) {
+ auto requireStatus = RequireChild(*input, 0);
+ if (requireStatus.Level != TStatus::Ok) {
+ return SyncStatus(requireStatus);
+ }
+
+ auto dropReplication = maybeDropReplication.Cast();
+
+ TDropReplicationSettings settings;
+ settings.Name = TString(dropReplication.Replication());
+ settings.Cascade = (dropReplication.Cascade().Value() == "1");
+
+ auto cluster = TString(dropReplication.DataSink().Cluster());
+ auto future = Gateway->DropReplication(cluster, settings);
+
+ return WrapFuture(future,
+ [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
+ Y_UNUSED(res);
+ auto resultNode = ctx.NewWorld(input->Pos());
+ return resultNode;
+ }, "Executing DROP ASYNC REPLICATION");
+ }
+
if (auto maybeGrantPermissions = TMaybeNode<TKiModifyPermissions>(input)) {
auto requireStatus = RequireChild(*input, 0);
if (requireStatus.Level != TStatus::Ok) {
diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
index 25282e244c..ed4a2c545b 100644
--- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
+++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
@@ -444,6 +444,42 @@
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"},
{"Index": 4, "Name": "MissingOk", "Type": "TCoAtom"}
]
+ },
+ {
+ "Name": "TKiCreateReplication",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "KiCreateReplication!"},
+ "Children": [
+ {"Index": 0, "Name": "World", "Type": "TExprBase"},
+ {"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"},
+ {"Index": 2, "Name": "Replication", "Type": "TCoAtom"},
+ {"Index": 3, "Name": "Targets", "Type": "TCoReplicationTargetList"},
+ {"Index": 4, "Name": "ReplicationSettings", "Type": "TCoNameValueTupleList"},
+ {"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList"}
+ ]
+ },
+ {
+ "Name": "TKiAlterReplication",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "KiAlterReplication!"},
+ "Children": [
+ {"Index": 0, "Name": "World", "Type": "TExprBase"},
+ {"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"},
+ {"Index": 2, "Name": "Replication", "Type": "TCoAtom"},
+ {"Index": 3, "Name": "ReplicationSettings", "Type": "TCoNameValueTupleList"},
+ {"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList"}
+ ]
+ },
+ {
+ "Name": "TKiDropReplication",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "KiDropReplication!"},
+ "Children": [
+ {"Index": 0, "Name": "World", "Type": "TExprBase"},
+ {"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"},
+ {"Index": 2, "Name": "Replication", "Type": "TCoAtom"},
+ {"Index": 3, "Name": "Cascade", "Type": "TCoAtom"}
+ ]
}
]
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index 336e3ae583..4633dbf055 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -686,6 +686,64 @@ struct TDropExternalTableSettings {
TString ExternalTable;
};
+struct TReplicationSettings {
+ struct TStateDone {
+ enum class EFailoverMode: ui32 {
+ Consistent = 1,
+ Force = 2,
+ };
+
+ EFailoverMode FailoverMode;
+ };
+
+ struct TStaticCredentials {
+ TString UserName;
+ TString Password;
+ };
+
+ TMaybe<TString> ConnectionString;
+ TMaybe<TString> Endpoint;
+ TMaybe<TString> Database;
+ TMaybe<TString> OAuthToken;
+ TMaybe<TStaticCredentials> StaticCredentials;
+ TMaybe<TStateDone> StateDone;
+
+ TStaticCredentials& EnsureStaticCredentials() {
+ if (!StaticCredentials) {
+ StaticCredentials = TStaticCredentials();
+ }
+
+ return *StaticCredentials;
+ }
+
+ using EFailoverMode = TStateDone::EFailoverMode;
+ TStateDone& EnsureStateDone(EFailoverMode mode = EFailoverMode::Consistent) {
+ if (!StateDone) {
+ StateDone = TStateDone{
+ .FailoverMode = mode,
+ };
+ }
+
+ return *StateDone;
+ }
+};
+
+struct TCreateReplicationSettings {
+ TString Name;
+ TVector<std::pair<TString, TString>> Targets;
+ TReplicationSettings Settings;
+};
+
+struct TAlterReplicationSettings {
+ TString Name;
+ TReplicationSettings Settings;
+};
+
+struct TDropReplicationSettings {
+ TString Name;
+ bool Cascade = false;
+};
+
struct TKikimrListPathItem {
TKikimrListPathItem(TString name, bool isDirectory) {
Name = name;
@@ -838,6 +896,12 @@ public:
virtual NThreading::TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic) = 0;
+ virtual NThreading::TFuture<TGenericResult> CreateReplication(const TString& cluster, const TCreateReplicationSettings& settings) = 0;
+
+ virtual NThreading::TFuture<TGenericResult> AlterReplication(const TString& cluster, const TAlterReplicationSettings& settings) = 0;
+
+ virtual NThreading::TFuture<TGenericResult> DropReplication(const TString& cluster, const TDropReplicationSettings& settings) = 0;
+
virtual NThreading::TFuture<TGenericResult> ModifyPermissions(const TString& cluster, const TModifyPermissionsSettings& settings) = 0;
virtual NThreading::TFuture<TGenericResult> CreateUser(const TString& cluster, const TCreateUserSettings& settings) = 0;
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
index ebe77e85e0..f6406e0867 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
@@ -47,6 +47,9 @@ struct TKikimrData {
DataSinkNames.insert(TKiCreateTopic::CallableName());
DataSinkNames.insert(TKiAlterTopic::CallableName());
DataSinkNames.insert(TKiDropTopic::CallableName());
+ DataSinkNames.insert(TKiCreateReplication::CallableName());
+ DataSinkNames.insert(TKiAlterReplication::CallableName());
+ DataSinkNames.insert(TKiDropReplication::CallableName());
DataSinkNames.insert(TKiCreateUser::CallableName());
DataSinkNames.insert(TKiModifyPermissions::CallableName());
DataSinkNames.insert(TKiAlterUser::CallableName());
@@ -103,6 +106,9 @@ struct TKikimrData {
TYdbOperation::CreateTopic |
TYdbOperation::AlterTopic |
TYdbOperation::DropTopic |
+ TYdbOperation::CreateReplication |
+ TYdbOperation::AlterReplication |
+ TYdbOperation::DropReplication |
TYdbOperation::CreateUser |
TYdbOperation::AlterUser |
TYdbOperation::DropUser |
@@ -407,6 +413,14 @@ bool TKikimrKey::Extract(const TExprNode& key) {
return false;
}
Target = nameNode->Child(0)->Content();
+ } else if (tagName == "replication") {
+ KeyType = Type::Replication;
+ const TExprNode* nameNode = key.Child(0)->Child(1);
+ if (!nameNode->IsCallable("String")) {
+ Ctx.AddError(TIssue(Ctx.GetPosition(key.Pos()), "Expected String as replication key."));
+ return false;
+ }
+ Target = nameNode->Child(0)->Content();
} else if(tagName == "permission") {
KeyType = Type::Permission;
Target = key.Child(0)->Child(1)->Child(0)->Content();
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index abcabe0c41..764f14774b 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -232,7 +232,10 @@ enum class TYdbOperation : ui32 {
AlterTopic = 1 << 20,
DropTopic = 1 << 21,
ModifyPermission = 1 << 22,
- RenameGroup = 1 << 23
+ RenameGroup = 1 << 23,
+ CreateReplication = 1 << 24,
+ AlterReplication = 1 << 25,
+ DropReplication = 1 << 26,
};
Y_DECLARE_FLAGS(TYdbOperations, TYdbOperation);
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
index f974e26e2a..5f7d3a0341 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
@@ -43,6 +43,10 @@ private:
virtual TStatus HandleAlterTopic(NNodes::TKiAlterTopic node, TExprContext& ctx) = 0;
virtual TStatus HandleDropTopic(NNodes::TKiDropTopic node, TExprContext& ctx) = 0;
+ virtual TStatus HandleCreateReplication(NNodes::TKiCreateReplication node, TExprContext& ctx) = 0;
+ virtual TStatus HandleAlterReplication(NNodes::TKiAlterReplication node, TExprContext& ctx) = 0;
+ virtual TStatus HandleDropReplication(NNodes::TKiDropReplication node, TExprContext& ctx) = 0;
+
virtual TStatus HandleCreateUser(NNodes::TKiCreateUser node, TExprContext& ctx) = 0;
virtual TStatus HandleAlterUser(NNodes::TKiAlterUser node, TExprContext& ctx) = 0;
virtual TStatus HandleDropUser(NNodes::TKiDropUser node, TExprContext& ctx) = 0;
@@ -81,7 +85,8 @@ public:
Object,
Topic,
Permission,
- PGObject
+ PGObject,
+ Replication,
};
struct TViewDescription {
@@ -110,6 +115,12 @@ public:
return Target;
}
+ TString GetReplicationPath() const {
+ Y_DEBUG_ABORT_UNLESS(KeyType.Defined());
+ Y_DEBUG_ABORT_UNLESS(KeyType == Type::Replication);
+ return Target;
+ }
+
TString GetFolderPath() const {
Y_DEBUG_ABORT_UNLESS(KeyType.Defined());
Y_DEBUG_ABORT_UNLESS(KeyType == Type::TableList);
diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
index 0895383d43..84c08955d7 100644
--- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
@@ -219,6 +219,10 @@ private:
{
return TStatus::Ok;
}
+ case TKikimrKey::Type::Replication:
+ {
+ return TStatus::Ok;
+ }
}
return TStatus::Error;
@@ -1604,6 +1608,66 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
return TStatus::Ok;
}
+ static bool CheckReplicationSettings(const TCoNameValueTupleList& settings, const THashSet<TString>& supported, TExprContext& ctx) {
+ for (const auto& setting : settings) {
+ auto name = setting.Name().Value();
+ if (!supported.contains(TString(name))) {
+ ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()), TStringBuilder() << "Unsupported setting"
+ << ": " << name));
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ virtual TStatus HandleCreateReplication(TKiCreateReplication node, TExprContext& ctx) override {
+ const THashSet<TString> supportedSettings = {
+ "connection_string",
+ "endpoint",
+ "database",
+ "token",
+ "user",
+ "password",
+ };
+
+ if (!CheckReplicationSettings(node.ReplicationSettings(), supportedSettings, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!node.Settings().Empty()) {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Unsupported settings"));
+ return TStatus::Error;
+ }
+
+ node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn());
+ return TStatus::Ok;
+ }
+
+ virtual TStatus HandleAlterReplication(TKiAlterReplication node, TExprContext& ctx) override {
+ const THashSet<TString> supportedSettings = {
+ "state",
+ "failover_mode",
+ };
+
+ if (!CheckReplicationSettings(node.ReplicationSettings(), supportedSettings, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!node.Settings().Empty()) {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Unsupported settings"));
+ return TStatus::Error;
+ }
+
+ node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn());
+ return TStatus::Ok;
+ }
+
+ virtual TStatus HandleDropReplication(TKiDropReplication node, TExprContext&) override {
+ node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn());
+ return TStatus::Ok;
+ }
+
virtual TStatus HandleModifyPermissions(TKiModifyPermissions node, TExprContext&) override {
node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn());
return TStatus::Ok;
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index c0e84410c0..b6782cd58d 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -5176,6 +5176,258 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
auto resultSuccess = session.ExecuteSchemeQuery("DROP EXTERNAL DATA SOURCE test").GetValueSync();
UNIT_ASSERT_C(resultSuccess.GetStatus() == EStatus::SCHEME_ERROR, TStringBuilder{} << resultSuccess.GetStatus() << " " << resultSuccess.GetIssues().ToString());
}
+
+ Y_UNIT_TEST(CreateAsyncReplication) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ // connection string & endpoint/database are mutually exclusive
+ {
+ auto query = R"(
+ --!syntax_v1
+ CREATE ASYNC REPLICATION `/Root/replication` FOR
+ `/Root/table` AS `/Root/replica`
+ WITH (
+ CONNECTION_STRING = "grpc://localhost:2135/?database=/Root"
+ ENDPOINT = "localhost:2135",
+ DATABASE = "/Root"
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ }
+
+ // token & user/password are mutually exclusive
+ {
+ auto query = R"(
+ --!syntax_v1
+ CREATE ASYNC REPLICATION `/Root/replication` FOR
+ `/Root/table` AS `/Root/replica`
+ WITH (
+ TOKEN = "foo",
+ USER = "user",
+ PASSWORD = "bar"
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ }
+
+ // unsupported setting (STATE) in CREATE
+ {
+ auto query = R"(
+ --!syntax_v1
+ CREATE ASYNC REPLICATION `/Root/replication` FOR
+ `/Root/table` AS `/Root/replica`
+ WITH (
+ CONNECTION_STRING = "grpc://localhost:2135/?database=/Root",
+ STATE = "DONE"
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ }
+
+ {
+ auto query = R"(
+ --!syntax_v1
+ CREATE TABLE `/Root/table` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )";
+
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ // ok
+ {
+ auto query = Sprintf(R"(
+ --!syntax_v1
+ CREATE ASYNC REPLICATION `/Root/replication` FOR
+ `/Root/table` AS `/Root/replica`
+ WITH (
+ ENDPOINT = "%s",
+ DATABASE = "/Root",
+ TOKEN = "root@builtin"
+ );
+ )", kikimr.GetEndpoint().c_str());
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST(AlterAsyncReplication) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ // path does not exist
+ {
+ auto query = R"(
+ --!syntax_v1
+ ALTER ASYNC REPLICATION `/Root/replication`
+ SET (
+ STATE = "DONE"
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
+ }
+
+ {
+ auto query = R"(
+ --!syntax_v1
+ CREATE TABLE `/Root/table` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )";
+
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto query = Sprintf(R"(
+ --!syntax_v1
+ CREATE ASYNC REPLICATION `/Root/replication` FOR
+ `/Root/table` AS `/Root/replica`
+ WITH (
+ ENDPOINT = "%s",
+ DATABASE = "/Root",
+ TOKEN = "root@builtin"
+ );
+ )", kikimr.GetEndpoint().c_str());
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ // unsupported setting (TOKEN) in ALTER
+ {
+ auto query = R"(
+ --!syntax_v1
+ ALTER ASYNC REPLICATION `/Root/replication`
+ SET (
+ TOKEN = "foo"
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ }
+
+ // invalid state
+ {
+ auto query = R"(
+ --!syntax_v1
+ ALTER ASYNC REPLICATION `/Root/replication`
+ SET (
+ STATE = "foo"
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ }
+
+ // invalid failover mode
+ {
+ auto query = R"(
+ --!syntax_v1
+ ALTER ASYNC REPLICATION `/Root/replication`
+ SET (
+ FAILOVER_MODE = "foo"
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ }
+
+ // ok
+ {
+ auto query = R"(
+ --!syntax_v1
+ ALTER ASYNC REPLICATION `/Root/replication`
+ SET (
+ STATE = "DONE",
+ FAILOVER_MODE = "FORCE"
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST(DropAsyncReplication) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ // path does not exist
+ {
+ auto query = R"(
+ --!syntax_v1
+ DROP ASYNC REPLICATION `/Root/replication`
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
+ }
+
+ {
+ auto query = R"(
+ --!syntax_v1
+ CREATE TABLE `/Root/table` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )";
+
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto query = Sprintf(R"(
+ --!syntax_v1
+ CREATE ASYNC REPLICATION `/Root/replication` FOR
+ `/Root/table` AS `/Root/replica`
+ WITH (
+ ENDPOINT = "%s",
+ DATABASE = "/Root",
+ TOKEN = "root@builtin"
+ );
+ )", kikimr.GetEndpoint().c_str());
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ // ok
+ {
+ auto query = R"(
+ --!syntax_v1
+ DROP ASYNC REPLICATION `/Root/replication`
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ }
}
Y_UNIT_TEST_SUITE(KqpOlapScheme) {
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 62cee353dd..bd13c8d6a5 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -441,6 +441,9 @@ message TKqpSchemeOperation {
NKikimrSchemeOp.TModifyScheme AlterTableStore = 31;
NKikimrSchemeOp.TModifyScheme CreateSequence = 32;
NKikimrSchemeOp.TModifyScheme DropSequence = 33;
+ NKikimrSchemeOp.TModifyScheme CreateReplication = 34;
+ NKikimrSchemeOp.TModifyScheme AlterReplication = 35;
+ NKikimrSchemeOp.TModifyScheme DropReplication = 36;
}
}
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
index 95e76ce023..1da15b8250 100644
--- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
+++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
@@ -2547,6 +2547,19 @@
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "ReturningStar"},
"Children": []
+ },
+ {
+ "Name": "TCoReplicationTarget",
+ "Base": "TExprBase",
+ "Match": {"Type": "Tuple"},
+ "Children": [
+ {"Index": 0, "Name": "RemotePath", "Type": "TCoAtom"},
+ {"Index": 1, "Name": "LocalPath", "Type": "TCoAtom"}
+ ]
+ },
+ {
+ "Name": "TCoReplicationTargetList",
+ "ListBase": "TCoReplicationTarget"
}
]
}
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp
index 3c23ea07ba..ed354a3af4 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp
@@ -525,6 +525,70 @@ TWriteTopicSettings ParseWriteTopicSettings(TExprList node, TExprContext& ctx) {
return ret;
}
+TWriteReplicationSettings ParseWriteReplicationSettings(TExprList node, TExprContext& ctx) {
+ TMaybeNode<TCoAtom> mode;
+ TVector<TCoReplicationTarget> targets;
+ TVector<TCoNameValueTuple> settings;
+ TVector<TCoNameValueTuple> other;
+
+ for (auto child : node) {
+ if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
+ auto tuple = maybeTuple.Cast();
+ auto name = tuple.Name().Value();
+
+ if (name == "mode") {
+ YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
+ mode = tuple.Value().Cast<TCoAtom>();
+ } else if (name == "targets") {
+ YQL_ENSURE(tuple.Value().Maybe<TExprList>());
+ for (const auto& target : tuple.Value().Cast<TExprList>()) {
+ auto builtTarget = Build<TCoReplicationTarget>(ctx, node.Pos());
+
+ YQL_ENSURE(target.Maybe<TCoNameValueTupleList>());
+ for (const auto& item : target.Cast<TCoNameValueTupleList>()) {
+ auto itemName = item.Name().Value();
+ if (itemName == "remote") {
+ builtTarget.RemotePath(item.Value().Cast<TCoAtom>());
+ } else if (itemName == "local") {
+ builtTarget.LocalPath(item.Value().Cast<TCoAtom>());
+ } else {
+ YQL_ENSURE(false, "unknown target item");
+ }
+ }
+
+ targets.push_back(builtTarget.Done());
+ }
+ } else if (name == "settings") {
+ YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>());
+ for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) {
+ settings.push_back(item);
+ }
+ } else {
+ other.push_back(tuple);
+ }
+ }
+ }
+
+ const auto& builtTargets = Build<TCoReplicationTargetList>(ctx, node.Pos())
+ .Add(targets)
+ .Done();
+
+ const auto& builtSettings = Build<TCoNameValueTupleList>(ctx, node.Pos())
+ .Add(settings)
+ .Done();
+
+ const auto& builtOther = Build<TCoNameValueTupleList>(ctx, node.Pos())
+ .Add(other)
+ .Done();
+
+ TWriteReplicationSettings ret(builtOther);
+ ret.Mode = mode;
+ ret.Targets = builtTargets;
+ ret.ReplicationSettings = builtSettings;
+
+ return ret;
+}
+
TWriteRoleSettings ParseWriteRoleSettings(TExprList node, TExprContext& ctx) {
TMaybeNode<TCoAtom> mode;
TVector<TCoAtom> roles;
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h
index 7c64417300..fd9eacbeed 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.h
+++ b/ydb/library/yql/providers/common/provider/yql_provider.h
@@ -83,6 +83,17 @@ struct TWriteTopicSettings {
};
+struct TWriteReplicationSettings {
+ NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
+ NNodes::TMaybeNode<NNodes::TCoReplicationTargetList> Targets;
+ NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> ReplicationSettings;
+ NNodes::TCoNameValueTupleList Other;
+
+ TWriteReplicationSettings(const NNodes::TCoNameValueTupleList& other)
+ : Other(other)
+ {}
+};
+
struct TWriteRoleSettings {
NNodes::TMaybeNode<NNodes::TCoAtom> Mode;
NNodes::TMaybeNode<NNodes::TCoAtomList> Roles;
@@ -153,6 +164,7 @@ TVector<TString> GetResOrPullColumnHints(const TExprNode& node);
TWriteTableSettings ParseWriteTableSettings(NNodes::TExprList node, TExprContext& ctx);
TWriteTopicSettings ParseWriteTopicSettings(NNodes::TExprList node, TExprContext& ctx);
+TWriteReplicationSettings ParseWriteReplicationSettings(NNodes::TExprList node, TExprContext& ctx);
TWriteRoleSettings ParseWriteRoleSettings(NNodes::TExprList node, TExprContext& ctx);
TWriteObjectSettings ParseWriteObjectSettings(NNodes::TExprList node, TExprContext& ctx);