diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-04-22 19:09:52 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-22 19:09:52 +0300 |
commit | 111c0526369cf055135cfc885ab64cc9f9e2bd76 (patch) | |
tree | e1c374abd5327c88ebdb64d76351cbe241b6e142 | |
parent | c05d352c297779ce0993d3675c1e84dc704f4a5e (diff) | |
download | ydb-111c0526369cf055135cfc885ab64cc9f9e2bd76.tar.gz |
{CREATE,ALTER,DROP} ASYNC REPLICATION impl (#3904)
18 files changed, 952 insertions, 3 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index 89216e58d3..11db54b315 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -247,6 +247,24 @@ public: break; } + case NKqpProto::TKqpSchemeOperation::kCreateReplication: { + const auto& modifyScheme = schemeOp.GetCreateReplication(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + + case NKqpProto::TKqpSchemeOperation::kAlterReplication: { + const auto& modifyScheme = schemeOp.GetAlterReplication(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + + case NKqpProto::TKqpSchemeOperation::kDropReplication: { + const auto& modifyScheme = schemeOp.GetDropReplication(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + default: InternalError(TStringBuilder() << "Unexpected scheme operation: " << (ui32) schemeOp.GetOperationCase()); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 1feaa2de98..d2b8e66065 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -1011,6 +1011,18 @@ public: } } + TFuture<TGenericResult> CreateReplication(const TString&, const NYql::TCreateReplicationSettings&) override { + return NotImplemented<TGenericResult>(); + } + + TFuture<TGenericResult> AlterReplication(const TString&, const NYql::TAlterReplicationSettings&) override { + return NotImplemented<TGenericResult>(); + } + + TFuture<TGenericResult> DropReplication(const TString&, const NYql::TDropReplicationSettings&) override { + return NotImplemented<TGenericResult>(); + } + TFuture<TGenericResult> AlterColumnTable(const TString& cluster, const NYql::TAlterColumnTableSettings& settings) override { Y_UNUSED(cluster); diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 366464d3ed..0b9f4ade14 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -2,9 +2,11 @@ #include <ydb/core/grpc_services/table_settings.h> #include <ydb/core/kqp/gateway/utils/scheme_helpers.h> +#include <ydb/core/protos/replication.pb.h> #include <ydb/core/ydb_convert/table_description.h> #include <ydb/core/ydb_convert/column_families.h> #include <ydb/core/ydb_convert/ydb_convert.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/common/parser.h> #include <ydb/services/metadata/abstract/kqp_common.h> namespace NKikimr::NKqp { @@ -1725,6 +1727,172 @@ public: } } + static TString AdjustPath(const TString& path, const TString& database) { + if (path.StartsWith('/')) { + if (!path.StartsWith(database)) { + throw yexception() << "Path '" << path << "' not in database '" << database << "'"; + } + return path; + } else { + return database + '/' + path; + } + } + + TFuture<TGenericResult> CreateReplication(const TString& cluster, const NYql::TCreateReplicationSettings& settings) override { + CHECK_PREPARED_DDL(CreateReplication); + + try { + if (cluster != SessionCtx->GetCluster()) { + return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster)); + } + + TString error; + std::pair<TString, TString> pathPair; + if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, true)) { + return MakeFuture(ResultFromError<TGenericResult>(error)); + } + + NKikimrSchemeOp::TModifyScheme tx; + tx.SetWorkingDir(pathPair.first); + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateReplication); + + auto& op = *tx.MutableReplication(); + op.SetName(pathPair.second); + + auto& config = *op.MutableConfig(); + auto& params = *config.MutableSrcConnectionParams(); + if (const auto& connectionString = settings.Settings.ConnectionString) { + const auto parseResult = NYdb::ParseConnectionString(*connectionString); + params.SetEndpoint(parseResult.Endpoint); + params.SetDatabase(parseResult.Database); + } + if (const auto& endpoint = settings.Settings.Endpoint) { + params.SetEndpoint(*endpoint); + } + if (const auto& database = settings.Settings.Database) { + params.SetDatabase(*database); + } + if (const auto& token = settings.Settings.OAuthToken) { + params.SetOAuthToken(*token); + } + if (const auto& creds = settings.Settings.StaticCredentials) { + params.MutableStaticCredentials()->SetUser(creds->UserName); + params.MutableStaticCredentials()->SetPassword(creds->Password); + } + + auto& targets = *config.MutableSpecific(); + for (const auto& [src, dst] : settings.Targets) { + auto& target = *targets.AddTargets(); + target.SetSrcPath(AdjustPath(src, params.GetDatabase())); + target.SetDstPath(AdjustPath(dst, GetDatabase())); + } + + if (IsPrepare()) { + auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto& phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + phyTx.MutableSchemeOperation()->MutableCreateReplication()->Swap(&tx); + + TGenericResult result; + result.SetSuccess(); + return MakeFuture(result); + } else { + return Gateway->ModifyScheme(std::move(tx)); + } + } + catch (yexception& e) { + return MakeFuture(ResultFromException<TGenericResult>(e)); + } + } + + TFuture<TGenericResult> AlterReplication(const TString& cluster, const NYql::TAlterReplicationSettings& settings) override { + CHECK_PREPARED_DDL(AlterReplication); + + try { + if (cluster != SessionCtx->GetCluster()) { + return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster)); + } + + std::pair<TString, TString> pathPair; + { + TString error; + if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, false)) { + return MakeFuture(ResultFromError<TGenericResult>(error)); + } + } + + NKikimrSchemeOp::TModifyScheme tx; + tx.SetWorkingDir(pathPair.first); + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterReplication); + + auto& op = *tx.MutableAlterReplication(); + op.SetName(pathPair.second); + + auto& state = *op.MutableState(); + if (const auto& done = settings.Settings.StateDone) { + state.MutableDone()->SetFailoverMode( + static_cast<NKikimrReplication::TReplicationState::TDone::EFailoverMode>(done->FailoverMode)); + } + + if (IsPrepare()) { + auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto& phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + phyTx.MutableSchemeOperation()->MutableAlterReplication()->Swap(&tx); + + TGenericResult result; + result.SetSuccess(); + return MakeFuture(result); + } else { + return Gateway->ModifyScheme(std::move(tx)); + } + } + catch (yexception& e) { + return MakeFuture(ResultFromException<TGenericResult>(e)); + } + } + + TFuture<TGenericResult> DropReplication(const TString& cluster, const NYql::TDropReplicationSettings& settings) override { + CHECK_PREPARED_DDL(DropReplication); + + try { + if (cluster != SessionCtx->GetCluster()) { + return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster)); + } + + std::pair<TString, TString> pathPair; + { + TString error; + if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, false)) { + return MakeFuture(ResultFromError<TGenericResult>(error)); + } + } + + NKikimrSchemeOp::TModifyScheme tx; + tx.SetWorkingDir(pathPair.first); + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropReplication); // TODO(ilnaz): CASCADE + + auto& op = *tx.MutableDrop(); + op.SetName(pathPair.second); + + if (IsPrepare()) { + auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto& phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + phyTx.MutableSchemeOperation()->MutableDropReplication()->Swap(&tx); + + TGenericResult result; + result.SetSuccess(); + return MakeFuture(result); + } else { + return Gateway->ModifyScheme(std::move(tx)); + } + } + catch (yexception& e) { + return MakeFuture(ResultFromException<TGenericResult>(e)); + } + } + TVector<NKikimrKqp::TKqpTableMetadataProto> GetCollectedSchemeData() override { return Gateway->GetCollectedSchemeData(); } diff --git a/ydb/core/kqp/host/ya.make b/ydb/core/kqp/host/ya.make index 66a73f2f2a..8b9980f176 100644 --- a/ydb/core/kqp/host/ya.make +++ b/ydb/core/kqp/host/ya.make @@ -31,6 +31,7 @@ PEERDIR( ydb/library/yql/providers/result/provider ydb/library/yql/providers/s3/provider ydb/library/yql/providers/pg/provider + ydb/public/sdk/cpp/client/impl/ydb_internal/common ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp index bef8363de3..fe67a1607b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp @@ -97,6 +97,24 @@ private: return TStatus::Ok; } + TStatus HandleCreateReplication(TKiCreateReplication node, TExprContext& ctx) override { + Y_UNUSED(ctx); + Y_UNUSED(node); + return TStatus::Ok; + } + + TStatus HandleAlterReplication(TKiAlterReplication node, TExprContext& ctx) override { + Y_UNUSED(ctx); + Y_UNUSED(node); + return TStatus::Ok; + } + + TStatus HandleDropReplication(TKiDropReplication node, TExprContext& ctx) override { + Y_UNUSED(ctx); + Y_UNUSED(node); + return TStatus::Ok; + } + TStatus HandleCreateSequence(NNodes::TKiCreateSequence node, TExprContext& ctx) override { Y_UNUSED(ctx); Y_UNUSED(node); @@ -314,6 +332,8 @@ private: return TStatus::Ok; case TKikimrKey::Type::PGObject: return TStatus::Ok; + case TKikimrKey::Type::Replication: + return TStatus::Ok; } ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Invalid table key type.")); @@ -479,6 +499,7 @@ public: || node.IsCallable(TKiAlterTable::CallableName())) { return true; } + if (node.IsCallable(TKiCreateTopic::CallableName()) || node.IsCallable(TKiAlterTopic::CallableName()) || node.IsCallable(TKiDropTopic::CallableName()) @@ -486,6 +507,13 @@ public: return true; } + if (node.IsCallable(TKiCreateReplication::CallableName()) + || node.IsCallable(TKiAlterReplication::CallableName()) + || node.IsCallable(TKiDropReplication::CallableName()) + ) { + return true; + } + if (node.IsCallable(TKiCreateSequence::CallableName()) || node.IsCallable(TKiDropSequence::CallableName())) { return true; @@ -1185,6 +1213,48 @@ public: } else { YQL_ENSURE(false, "unknown PGObject with type: \"" << key.GetPGObjectType() << "\""); } + break; + } + + case TKikimrKey::Type::Replication: { + auto settings = NCommon::ParseWriteReplicationSettings(TExprList(node->Child(4)), ctx); + YQL_ENSURE(settings.Mode); + auto mode = settings.Mode.Cast(); + + if (mode == "create") { + return Build<TKiCreateReplication>(ctx, node->Pos()) + .World(node->Child(0)) + .DataSink(node->Child(1)) + .Replication().Build(key.GetReplicationPath()) + .Targets(settings.Targets.Cast()) + .ReplicationSettings(settings.ReplicationSettings.Cast()) + .Settings(settings.Other) + .Done() + .Ptr(); + } else if (mode == "alter") { + return Build<TKiAlterReplication>(ctx, node->Pos()) + .World(node->Child(0)) + .DataSink(node->Child(1)) + .Replication().Build(key.GetReplicationPath()) + .ReplicationSettings(settings.ReplicationSettings.Cast()) + .Settings(settings.Other) + .Done() + .Ptr(); + } else if (mode == "drop" || mode == "dropCascade") { + return Build<TKiDropReplication>(ctx, node->Pos()) + .World(node->Child(0)) + .DataSink(node->Child(1)) + .Replication().Build(key.GetReplicationPath()) + .Cascade<TCoAtom>() + .Value(mode == "dropCascade") + .Build() + .Done() + .Ptr(); + } else { + ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "Unknown operation type for replication")); + return nullptr; + } + break; } } @@ -1276,6 +1346,18 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt return HandleDropTopic(node.Cast(), ctx); } + if (auto node = TMaybeNode<TKiCreateReplication>(input)) { + return HandleCreateReplication(node.Cast(), ctx); + } + + if (auto node = TMaybeNode<TKiAlterReplication>(input)) { + return HandleAlterReplication(node.Cast(), ctx); + } + + if (auto node = TMaybeNode<TKiDropReplication>(input)) { + return HandleDropReplication(node.Cast(), ctx); + } + if (auto node = TMaybeNode<TKiUpsertObject>(input)) { return HandleUpsertObject(node.Cast(), ctx); } @@ -1324,7 +1406,7 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt return HandleDropGroup(node.Cast(), ctx); } - if(auto node = TMaybeNode<TPgDropObject>(input)) { + if (auto node = TMaybeNode<TPgDropObject>(input)) { return HandlePgDropObject(node.Cast(), ctx); } diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index 92ef2d6a17..1a1098892a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -188,6 +188,8 @@ private: return TStatus::Ok; case TKikimrKey::Type::PGObject: return TStatus::Ok; + case TKikimrKey::Type::Replication: + return TStatus::Ok; } return TStatus::Error; diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index cffbe19319..634cd38a5d 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1784,6 +1784,136 @@ public: }, "Executing DROP TOPIC"); } + if (auto maybeCreateReplication = TMaybeNode<TKiCreateReplication>(input)) { + auto requireStatus = RequireChild(*input, 0); + if (requireStatus.Level != TStatus::Ok) { + return SyncStatus(requireStatus); + } + + auto createReplication = maybeCreateReplication.Cast(); + + TCreateReplicationSettings settings; + settings.Name = TString(createReplication.Replication()); + + for (auto target : createReplication.Targets()) { + settings.Targets.emplace_back( + target.RemotePath().Cast<TCoAtom>().StringValue(), + target.LocalPath().Cast<TCoAtom>().StringValue() + ); + } + + for (auto setting : createReplication.ReplicationSettings()) { + auto name = setting.Name().Value(); + if (name == "connection_string") { + settings.Settings.ConnectionString = setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value(); + } else if (name == "endpoint") { + settings.Settings.Endpoint = setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value(); + } else if (name == "database") { + settings.Settings.Database = setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value(); + } else if (name == "token") { + settings.Settings.OAuthToken = setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value(); + } else if (name == "user") { + settings.Settings.EnsureStaticCredentials().UserName = + setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value(); + } else if (name == "password") { + settings.Settings.EnsureStaticCredentials().Password = + setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value(); + } + } + + if (settings.Settings.ConnectionString && (settings.Settings.Endpoint || settings.Settings.Database)) { + ctx.AddError(TIssue(ctx.GetPosition(createReplication.Pos()), + TStringBuilder() << "Connection string and Endpoint/Database are mutually exclusive")); + return SyncError(); + } + + if (settings.Settings.OAuthToken && settings.Settings.StaticCredentials) { + ctx.AddError(TIssue(ctx.GetPosition(createReplication.Pos()), + TStringBuilder() << "Token and User/Password are mutually exclusive")); + return SyncError(); + } + + auto cluster = TString(createReplication.DataSink().Cluster()); + auto future = Gateway->CreateReplication(cluster, settings); + + return WrapFuture(future, + [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) { + Y_UNUSED(res); + auto resultNode = ctx.NewWorld(input->Pos()); + return resultNode; + }, "Executing CREATE ASYNC REPLICATION"); + } + + if (auto maybeAlterReplication = TMaybeNode<TKiAlterReplication>(input)) { + auto requireStatus = RequireChild(*input, 0); + if (requireStatus.Level != TStatus::Ok) { + return SyncStatus(requireStatus); + } + + auto alterReplication = maybeAlterReplication.Cast(); + + TAlterReplicationSettings settings; + settings.Name = TString(alterReplication.Replication()); + + for (auto setting : alterReplication.ReplicationSettings()) { + auto name = setting.Name().Value(); + if (name == "state") { + auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()); + if (to_lower(value) == "done") { + settings.Settings.EnsureStateDone(); + } else { + ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()), + TStringBuilder() << "Unknown replication state: " << value)); + return SyncError(); + } + } else if (name == "failover_mode") { + auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()); + if (to_lower(value) == "consistent") { + settings.Settings.EnsureStateDone(TReplicationSettings::EFailoverMode::Consistent); + } else if (to_lower(value) == "force") { + settings.Settings.EnsureStateDone(TReplicationSettings::EFailoverMode::Force); + } else { + ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()), + TStringBuilder() << "Unknown failover mode: " << value)); + return SyncError(); + } + } + } + + auto cluster = TString(alterReplication.DataSink().Cluster()); + auto future = Gateway->AlterReplication(cluster, settings); + + return WrapFuture(future, + [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) { + Y_UNUSED(res); + auto resultNode = ctx.NewWorld(input->Pos()); + return resultNode; + }, "Executing ALTER ASYNC REPLICATION"); + } + + if (auto maybeDropReplication = TMaybeNode<TKiDropReplication>(input)) { + auto requireStatus = RequireChild(*input, 0); + if (requireStatus.Level != TStatus::Ok) { + return SyncStatus(requireStatus); + } + + auto dropReplication = maybeDropReplication.Cast(); + + TDropReplicationSettings settings; + settings.Name = TString(dropReplication.Replication()); + settings.Cascade = (dropReplication.Cascade().Value() == "1"); + + auto cluster = TString(dropReplication.DataSink().Cluster()); + auto future = Gateway->DropReplication(cluster, settings); + + return WrapFuture(future, + [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) { + Y_UNUSED(res); + auto resultNode = ctx.NewWorld(input->Pos()); + return resultNode; + }, "Executing DROP ASYNC REPLICATION"); + } + if (auto maybeGrantPermissions = TMaybeNode<TKiModifyPermissions>(input)) { auto requireStatus = RequireChild(*input, 0); if (requireStatus.Level != TStatus::Ok) { diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json index 25282e244c..ed4a2c545b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json +++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json @@ -444,6 +444,42 @@ {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"}, {"Index": 4, "Name": "MissingOk", "Type": "TCoAtom"} ] + }, + { + "Name": "TKiCreateReplication", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "KiCreateReplication!"}, + "Children": [ + {"Index": 0, "Name": "World", "Type": "TExprBase"}, + {"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"}, + {"Index": 2, "Name": "Replication", "Type": "TCoAtom"}, + {"Index": 3, "Name": "Targets", "Type": "TCoReplicationTargetList"}, + {"Index": 4, "Name": "ReplicationSettings", "Type": "TCoNameValueTupleList"}, + {"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList"} + ] + }, + { + "Name": "TKiAlterReplication", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "KiAlterReplication!"}, + "Children": [ + {"Index": 0, "Name": "World", "Type": "TExprBase"}, + {"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"}, + {"Index": 2, "Name": "Replication", "Type": "TCoAtom"}, + {"Index": 3, "Name": "ReplicationSettings", "Type": "TCoNameValueTupleList"}, + {"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList"} + ] + }, + { + "Name": "TKiDropReplication", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "KiDropReplication!"}, + "Children": [ + {"Index": 0, "Name": "World", "Type": "TExprBase"}, + {"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"}, + {"Index": 2, "Name": "Replication", "Type": "TCoAtom"}, + {"Index": 3, "Name": "Cascade", "Type": "TCoAtom"} + ] } ] } diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 336e3ae583..4633dbf055 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -686,6 +686,64 @@ struct TDropExternalTableSettings { TString ExternalTable; }; +struct TReplicationSettings { + struct TStateDone { + enum class EFailoverMode: ui32 { + Consistent = 1, + Force = 2, + }; + + EFailoverMode FailoverMode; + }; + + struct TStaticCredentials { + TString UserName; + TString Password; + }; + + TMaybe<TString> ConnectionString; + TMaybe<TString> Endpoint; + TMaybe<TString> Database; + TMaybe<TString> OAuthToken; + TMaybe<TStaticCredentials> StaticCredentials; + TMaybe<TStateDone> StateDone; + + TStaticCredentials& EnsureStaticCredentials() { + if (!StaticCredentials) { + StaticCredentials = TStaticCredentials(); + } + + return *StaticCredentials; + } + + using EFailoverMode = TStateDone::EFailoverMode; + TStateDone& EnsureStateDone(EFailoverMode mode = EFailoverMode::Consistent) { + if (!StateDone) { + StateDone = TStateDone{ + .FailoverMode = mode, + }; + } + + return *StateDone; + } +}; + +struct TCreateReplicationSettings { + TString Name; + TVector<std::pair<TString, TString>> Targets; + TReplicationSettings Settings; +}; + +struct TAlterReplicationSettings { + TString Name; + TReplicationSettings Settings; +}; + +struct TDropReplicationSettings { + TString Name; + bool Cascade = false; +}; + struct TKikimrListPathItem { TKikimrListPathItem(TString name, bool isDirectory) { Name = name; @@ -838,6 +896,12 @@ public: virtual NThreading::TFuture<TGenericResult> DropTopic(const TString& cluster, const TString& topic) = 0; + virtual NThreading::TFuture<TGenericResult> CreateReplication(const TString& cluster, const TCreateReplicationSettings& settings) = 0; + + virtual NThreading::TFuture<TGenericResult> AlterReplication(const TString& cluster, const TAlterReplicationSettings& settings) = 0; + + virtual NThreading::TFuture<TGenericResult> DropReplication(const TString& cluster, const TDropReplicationSettings& settings) = 0; + virtual NThreading::TFuture<TGenericResult> ModifyPermissions(const TString& cluster, const TModifyPermissionsSettings& settings) = 0; virtual NThreading::TFuture<TGenericResult> CreateUser(const TString& cluster, const TCreateUserSettings& settings) = 0; diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp index ebe77e85e0..f6406e0867 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp @@ -47,6 +47,9 @@ struct TKikimrData { DataSinkNames.insert(TKiCreateTopic::CallableName()); DataSinkNames.insert(TKiAlterTopic::CallableName()); DataSinkNames.insert(TKiDropTopic::CallableName()); + DataSinkNames.insert(TKiCreateReplication::CallableName()); + DataSinkNames.insert(TKiAlterReplication::CallableName()); + DataSinkNames.insert(TKiDropReplication::CallableName()); DataSinkNames.insert(TKiCreateUser::CallableName()); DataSinkNames.insert(TKiModifyPermissions::CallableName()); DataSinkNames.insert(TKiAlterUser::CallableName()); @@ -103,6 +106,9 @@ struct TKikimrData { TYdbOperation::CreateTopic | TYdbOperation::AlterTopic | TYdbOperation::DropTopic | + TYdbOperation::CreateReplication | + TYdbOperation::AlterReplication | + TYdbOperation::DropReplication | TYdbOperation::CreateUser | TYdbOperation::AlterUser | TYdbOperation::DropUser | @@ -407,6 +413,14 @@ bool TKikimrKey::Extract(const TExprNode& key) { return false; } Target = nameNode->Child(0)->Content(); + } else if (tagName == "replication") { + KeyType = Type::Replication; + const TExprNode* nameNode = key.Child(0)->Child(1); + if (!nameNode->IsCallable("String")) { + Ctx.AddError(TIssue(Ctx.GetPosition(key.Pos()), "Expected String as replication key.")); + return false; + } + Target = nameNode->Child(0)->Content(); } else if(tagName == "permission") { KeyType = Type::Permission; Target = key.Child(0)->Child(1)->Child(0)->Content(); diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index abcabe0c41..764f14774b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -232,7 +232,10 @@ enum class TYdbOperation : ui32 { AlterTopic = 1 << 20, DropTopic = 1 << 21, ModifyPermission = 1 << 22, - RenameGroup = 1 << 23 + RenameGroup = 1 << 23, + CreateReplication = 1 << 24, + AlterReplication = 1 << 25, + DropReplication = 1 << 26, }; Y_DECLARE_FLAGS(TYdbOperations, TYdbOperation); diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index f974e26e2a..5f7d3a0341 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -43,6 +43,10 @@ private: virtual TStatus HandleAlterTopic(NNodes::TKiAlterTopic node, TExprContext& ctx) = 0; virtual TStatus HandleDropTopic(NNodes::TKiDropTopic node, TExprContext& ctx) = 0; + virtual TStatus HandleCreateReplication(NNodes::TKiCreateReplication node, TExprContext& ctx) = 0; + virtual TStatus HandleAlterReplication(NNodes::TKiAlterReplication node, TExprContext& ctx) = 0; + virtual TStatus HandleDropReplication(NNodes::TKiDropReplication node, TExprContext& ctx) = 0; + virtual TStatus HandleCreateUser(NNodes::TKiCreateUser node, TExprContext& ctx) = 0; virtual TStatus HandleAlterUser(NNodes::TKiAlterUser node, TExprContext& ctx) = 0; virtual TStatus HandleDropUser(NNodes::TKiDropUser node, TExprContext& ctx) = 0; @@ -81,7 +85,8 @@ public: Object, Topic, Permission, - PGObject + PGObject, + Replication, }; struct TViewDescription { @@ -110,6 +115,12 @@ public: return Target; } + TString GetReplicationPath() const { + Y_DEBUG_ABORT_UNLESS(KeyType.Defined()); + Y_DEBUG_ABORT_UNLESS(KeyType == Type::Replication); + return Target; + } + TString GetFolderPath() const { Y_DEBUG_ABORT_UNLESS(KeyType.Defined()); Y_DEBUG_ABORT_UNLESS(KeyType == Type::TableList); diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 0895383d43..84c08955d7 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -219,6 +219,10 @@ private: { return TStatus::Ok; } + case TKikimrKey::Type::Replication: + { + return TStatus::Ok; + } } return TStatus::Error; @@ -1604,6 +1608,66 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over return TStatus::Ok; } + static bool CheckReplicationSettings(const TCoNameValueTupleList& settings, const THashSet<TString>& supported, TExprContext& ctx) { + for (const auto& setting : settings) { + auto name = setting.Name().Value(); + if (!supported.contains(TString(name))) { + ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()), TStringBuilder() << "Unsupported setting" + << ": " << name)); + return false; + } + } + + return true; + } + + virtual TStatus HandleCreateReplication(TKiCreateReplication node, TExprContext& ctx) override { + const THashSet<TString> supportedSettings = { + "connection_string", + "endpoint", + "database", + "token", + "user", + "password", + }; + + if (!CheckReplicationSettings(node.ReplicationSettings(), supportedSettings, ctx)) { + return TStatus::Error; + } + + if (!node.Settings().Empty()) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Unsupported settings")); + return TStatus::Error; + } + + node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn()); + return TStatus::Ok; + } + + virtual TStatus HandleAlterReplication(TKiAlterReplication node, TExprContext& ctx) override { + const THashSet<TString> supportedSettings = { + "state", + "failover_mode", + }; + + if (!CheckReplicationSettings(node.ReplicationSettings(), supportedSettings, ctx)) { + return TStatus::Error; + } + + if (!node.Settings().Empty()) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Unsupported settings")); + return TStatus::Error; + } + + node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn()); + return TStatus::Ok; + } + + virtual TStatus HandleDropReplication(TKiDropReplication node, TExprContext&) override { + node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn()); + return TStatus::Ok; + } + virtual TStatus HandleModifyPermissions(TKiModifyPermissions node, TExprContext&) override { node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn()); return TStatus::Ok; diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index c0e84410c0..b6782cd58d 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -5176,6 +5176,258 @@ Y_UNIT_TEST_SUITE(KqpScheme) { auto resultSuccess = session.ExecuteSchemeQuery("DROP EXTERNAL DATA SOURCE test").GetValueSync(); UNIT_ASSERT_C(resultSuccess.GetStatus() == EStatus::SCHEME_ERROR, TStringBuilder{} << resultSuccess.GetStatus() << " " << resultSuccess.GetIssues().ToString()); } + + Y_UNIT_TEST(CreateAsyncReplication) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + // connection string & endpoint/database are mutually exclusive + { + auto query = R"( + --!syntax_v1 + CREATE ASYNC REPLICATION `/Root/replication` FOR + `/Root/table` AS `/Root/replica` + WITH ( + CONNECTION_STRING = "grpc://localhost:2135/?database=/Root" + ENDPOINT = "localhost:2135", + DATABASE = "/Root" + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + + // token & user/password are mutually exclusive + { + auto query = R"( + --!syntax_v1 + CREATE ASYNC REPLICATION `/Root/replication` FOR + `/Root/table` AS `/Root/replica` + WITH ( + TOKEN = "foo", + USER = "user", + PASSWORD = "bar" + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + + // unsupported setting (STATE) in CREATE + { + auto query = R"( + --!syntax_v1 + CREATE ASYNC REPLICATION `/Root/replication` FOR + `/Root/table` AS `/Root/replica` + WITH ( + CONNECTION_STRING = "grpc://localhost:2135/?database=/Root", + STATE = "DONE" + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + + { + auto query = R"( + --!syntax_v1 + CREATE TABLE `/Root/table` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + // ok + { + auto query = Sprintf(R"( + --!syntax_v1 + CREATE ASYNC REPLICATION `/Root/replication` FOR + `/Root/table` AS `/Root/replica` + WITH ( + ENDPOINT = "%s", + DATABASE = "/Root", + TOKEN = "root@builtin" + ); + )", kikimr.GetEndpoint().c_str()); + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(AlterAsyncReplication) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + // path does not exist + { + auto query = R"( + --!syntax_v1 + ALTER ASYNC REPLICATION `/Root/replication` + SET ( + STATE = "DONE" + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); + } + + { + auto query = R"( + --!syntax_v1 + CREATE TABLE `/Root/table` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto query = Sprintf(R"( + --!syntax_v1 + CREATE ASYNC REPLICATION `/Root/replication` FOR + `/Root/table` AS `/Root/replica` + WITH ( + ENDPOINT = "%s", + DATABASE = "/Root", + TOKEN = "root@builtin" + ); + )", kikimr.GetEndpoint().c_str()); + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + // unsupported setting (TOKEN) in ALTER + { + auto query = R"( + --!syntax_v1 + ALTER ASYNC REPLICATION `/Root/replication` + SET ( + TOKEN = "foo" + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + + // invalid state + { + auto query = R"( + --!syntax_v1 + ALTER ASYNC REPLICATION `/Root/replication` + SET ( + STATE = "foo" + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + + // invalid failover mode + { + auto query = R"( + --!syntax_v1 + ALTER ASYNC REPLICATION `/Root/replication` + SET ( + FAILOVER_MODE = "foo" + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + + // ok + { + auto query = R"( + --!syntax_v1 + ALTER ASYNC REPLICATION `/Root/replication` + SET ( + STATE = "DONE", + FAILOVER_MODE = "FORCE" + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(DropAsyncReplication) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + // path does not exist + { + auto query = R"( + --!syntax_v1 + DROP ASYNC REPLICATION `/Root/replication` + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); + } + + { + auto query = R"( + --!syntax_v1 + CREATE TABLE `/Root/table` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto query = Sprintf(R"( + --!syntax_v1 + CREATE ASYNC REPLICATION `/Root/replication` FOR + `/Root/table` AS `/Root/replica` + WITH ( + ENDPOINT = "%s", + DATABASE = "/Root", + TOKEN = "root@builtin" + ); + )", kikimr.GetEndpoint().c_str()); + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + // ok + { + auto query = R"( + --!syntax_v1 + DROP ASYNC REPLICATION `/Root/replication` + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + } } Y_UNIT_TEST_SUITE(KqpOlapScheme) { diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 62cee353dd..bd13c8d6a5 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -441,6 +441,9 @@ message TKqpSchemeOperation { NKikimrSchemeOp.TModifyScheme AlterTableStore = 31; NKikimrSchemeOp.TModifyScheme CreateSequence = 32; NKikimrSchemeOp.TModifyScheme DropSequence = 33; + NKikimrSchemeOp.TModifyScheme CreateReplication = 34; + NKikimrSchemeOp.TModifyScheme AlterReplication = 35; + NKikimrSchemeOp.TModifyScheme DropReplication = 36; } } diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index 95e76ce023..1da15b8250 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -2547,6 +2547,19 @@ "Base": "TCallable", "Match": {"Type": "Callable", "Name": "ReturningStar"}, "Children": [] + }, + { + "Name": "TCoReplicationTarget", + "Base": "TExprBase", + "Match": {"Type": "Tuple"}, + "Children": [ + {"Index": 0, "Name": "RemotePath", "Type": "TCoAtom"}, + {"Index": 1, "Name": "LocalPath", "Type": "TCoAtom"} + ] + }, + { + "Name": "TCoReplicationTargetList", + "ListBase": "TCoReplicationTarget" } ] } diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index 3c23ea07ba..ed354a3af4 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -525,6 +525,70 @@ TWriteTopicSettings ParseWriteTopicSettings(TExprList node, TExprContext& ctx) { return ret; } +TWriteReplicationSettings ParseWriteReplicationSettings(TExprList node, TExprContext& ctx) { + TMaybeNode<TCoAtom> mode; + TVector<TCoReplicationTarget> targets; + TVector<TCoNameValueTuple> settings; + TVector<TCoNameValueTuple> other; + + for (auto child : node) { + if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) { + auto tuple = maybeTuple.Cast(); + auto name = tuple.Name().Value(); + + if (name == "mode") { + YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); + mode = tuple.Value().Cast<TCoAtom>(); + } else if (name == "targets") { + YQL_ENSURE(tuple.Value().Maybe<TExprList>()); + for (const auto& target : tuple.Value().Cast<TExprList>()) { + auto builtTarget = Build<TCoReplicationTarget>(ctx, node.Pos()); + + YQL_ENSURE(target.Maybe<TCoNameValueTupleList>()); + for (const auto& item : target.Cast<TCoNameValueTupleList>()) { + auto itemName = item.Name().Value(); + if (itemName == "remote") { + builtTarget.RemotePath(item.Value().Cast<TCoAtom>()); + } else if (itemName == "local") { + builtTarget.LocalPath(item.Value().Cast<TCoAtom>()); + } else { + YQL_ENSURE(false, "unknown target item"); + } + } + + targets.push_back(builtTarget.Done()); + } + } else if (name == "settings") { + YQL_ENSURE(tuple.Value().Maybe<TCoNameValueTupleList>()); + for (const auto& item : tuple.Value().Cast<TCoNameValueTupleList>()) { + settings.push_back(item); + } + } else { + other.push_back(tuple); + } + } + } + + const auto& builtTargets = Build<TCoReplicationTargetList>(ctx, node.Pos()) + .Add(targets) + .Done(); + + const auto& builtSettings = Build<TCoNameValueTupleList>(ctx, node.Pos()) + .Add(settings) + .Done(); + + const auto& builtOther = Build<TCoNameValueTupleList>(ctx, node.Pos()) + .Add(other) + .Done(); + + TWriteReplicationSettings ret(builtOther); + ret.Mode = mode; + ret.Targets = builtTargets; + ret.ReplicationSettings = builtSettings; + + return ret; +} + TWriteRoleSettings ParseWriteRoleSettings(TExprList node, TExprContext& ctx) { TMaybeNode<TCoAtom> mode; TVector<TCoAtom> roles; diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h index 7c64417300..fd9eacbeed 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.h +++ b/ydb/library/yql/providers/common/provider/yql_provider.h @@ -83,6 +83,17 @@ struct TWriteTopicSettings { }; +struct TWriteReplicationSettings { + NNodes::TMaybeNode<NNodes::TCoAtom> Mode; + NNodes::TMaybeNode<NNodes::TCoReplicationTargetList> Targets; + NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> ReplicationSettings; + NNodes::TCoNameValueTupleList Other; + + TWriteReplicationSettings(const NNodes::TCoNameValueTupleList& other) + : Other(other) + {} +}; + struct TWriteRoleSettings { NNodes::TMaybeNode<NNodes::TCoAtom> Mode; NNodes::TMaybeNode<NNodes::TCoAtomList> Roles; @@ -153,6 +164,7 @@ TVector<TString> GetResOrPullColumnHints(const TExprNode& node); TWriteTableSettings ParseWriteTableSettings(NNodes::TExprList node, TExprContext& ctx); TWriteTopicSettings ParseWriteTopicSettings(NNodes::TExprList node, TExprContext& ctx); +TWriteReplicationSettings ParseWriteReplicationSettings(NNodes::TExprList node, TExprContext& ctx); TWriteRoleSettings ParseWriteRoleSettings(NNodes::TExprList node, TExprContext& ctx); TWriteObjectSettings ParseWriteObjectSettings(NNodes::TExprList node, TExprContext& ctx); |