diff options
author | Innokentii Mokin <innokentii@ydb.tech> | 2024-11-20 14:48:54 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-20 14:48:54 +0300 |
commit | bd7040fb3886ee0a2f61db154b5a79c54f16ef79 (patch) | |
tree | f530192720d16a06d91423d684359e7e40841e9e | |
parent | 0cc149bcbf3defc46d8df66e99c500ad2f636880 (diff) | |
download | ydb-bd7040fb3886ee0a2f61db154b5a79c54f16ef79.tar.gz |
Add backup backup collection op (#11293)
28 files changed, 682 insertions, 81 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index 3c68f35881..a8781780f2 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -372,6 +372,12 @@ public: break; } + case NKqpProto::TKqpSchemeOperation::kBackup: { + const auto& modifyScheme = schemeOp.GetBackup(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + default: InternalError(TStringBuilder() << "Unexpected scheme operation: " << (ui32) schemeOp.GetOperationCase()); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 4dc088bfd3..f70e922900 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -1282,6 +1282,10 @@ public: return NotImplemented<TGenericResult>(); } + TFuture<TGenericResult> Backup(const TString&, const NYql::TBackupSettings&) override { + return NotImplemented<TGenericResult>(); + } + TFuture<TGenericResult> CreateUser(const TString& cluster, const NYql::TCreateUserSettings& settings) override { using TRequest = TEvTxUserProxy::TEvProposeTransaction; diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 9688d0aecd..1e68c0dc47 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -1287,6 +1287,50 @@ public: } } + + TFuture<TGenericResult> Backup(const TString& cluster, const NYql::TBackupSettings& settings) override { + CHECK_PREPARED_DDL(Backup); + + try { + if (cluster != SessionCtx->GetCluster()) { + return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster)); + } + + std::pair<TString, TString> pathPair; + if (settings.Name.StartsWith("/")) { + TString error; + if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, true)) { + return MakeFuture(ResultFromError<TGenericResult>(error)); + } + } else { + pathPair.second = ".backups/collections/" + settings.Name; + } + + NKikimrSchemeOp::TModifyScheme tx; + tx.SetWorkingDir(GetDatabase()); + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpBackupBackupCollection); + + auto& op = *tx.MutableBackupBackupCollection(); + op.SetName(pathPair.second); + + if (IsPrepare()) { + auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto& phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + phyTx.MutableSchemeOperation()->MutableBackup()->Swap(&tx); + + TGenericResult result; + result.SetSuccess(); + return MakeFuture(result); + } else { + return Gateway->ModifyScheme(std::move(tx)); + } + } + catch (yexception& e) { + return MakeFuture(ResultFromException<TGenericResult>(e)); + } + } + TFuture<TGenericResult> CreateUser(const TString& cluster, const TCreateUserSettings& settings) override { CHECK_PREPARED_DDL(CreateUser); diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp index bd7de4fda8..afa1d0cf86 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp @@ -167,6 +167,12 @@ private: return TStatus::Ok; } + TStatus HandleBackup(TKiBackup node, TExprContext& ctx) override { + Y_UNUSED(ctx); + Y_UNUSED(node); + return TStatus::Ok; + } + TStatus HandleCreateUser(TKiCreateUser node, TExprContext& ctx) override { ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "CreateUser is not yet implemented for intent determination transformer")); @@ -599,6 +605,10 @@ public: return true; } + if (node.IsCallable(TKiBackup::CallableName())) { + return true; + } + return false; } @@ -1497,6 +1507,14 @@ public: .Build() .Done() .Ptr(); + } else if (mode == "backup") { + return Build<TKiBackup>(ctx, node->Pos()) + .World(node->Child(0)) + .DataSink(node->Child(1)) + .BackupCollection().Build(key.GetBackupCollectionPath().Name) + .Prefix().Build(key.GetBackupCollectionPath().Prefix) + .Done() + .Ptr(); } else { ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Unknown operation type for backup collection: " << TString(mode))); return nullptr; @@ -1768,6 +1786,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt return HandleDropBackupCollection(node.Cast(), ctx); } + if (auto node = TMaybeNode<TKiBackup>(input)) { + return HandleBackup(node.Cast(), ctx); + } + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "(Kikimr DataSink) Unsupported function: " << callable.CallableName())); return TStatus::Error; diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 189bba353c..32797710cd 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -2592,6 +2592,28 @@ public: }, "Executing DROP BACKUP COLLECTION"); } + if (auto maybeBackup = TMaybeNode<TKiBackup>(input)) { + auto requireStatus = RequireChild(*input, 0); + if (requireStatus.Level != TStatus::Ok) { + return SyncStatus(requireStatus); + } + + auto backup = maybeBackup.Cast(); + + TBackupSettings settings; + settings.Name = TString(backup.BackupCollection()); + + auto cluster = TString(backup.DataSink().Cluster()); + auto future = Gateway->Backup(cluster, settings); + + return WrapFuture(future, + [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) { + Y_UNUSED(res); + auto resultNode = ctx.NewWorld(input->Pos()); + return resultNode; + }, "Executing BACKUP"); + } + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "(Kikimr DataSink) Failed to execute node: " << input->Content())); return SyncError(); diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json index e92b574926..0c34512251 100644 --- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json +++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json @@ -562,6 +562,17 @@ {"Index": 3, "Name": "Prefix", "Type": "TCoAtom"}, {"Index": 4, "Name": "Cascade", "Type": "TCoAtom"} ] + }, + { + "Name": "TKiBackup", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "KiBackup!"}, + "Children": [ + {"Index": 0, "Name": "World", "Type": "TExprBase"}, + {"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"}, + {"Index": 2, "Name": "BackupCollection", "Type": "TCoAtom"}, + {"Index": 3, "Name": "Prefix", "Type": "TCoAtom"} + ] } ] } diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 9a04060f00..de29871b07 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -876,6 +876,10 @@ struct TDropBackupCollectionSettings { bool Cascade = false; }; +struct TBackupSettings { + TString Name; +}; + struct TKikimrListPathItem { TKikimrListPathItem(TString name, bool isDirectory) { Name = name; @@ -1066,6 +1070,8 @@ public: virtual NThreading::TFuture<TGenericResult> DropBackupCollection(const TString& cluster, const TDropBackupCollectionSettings& settings) = 0; + virtual NThreading::TFuture<TGenericResult> Backup(const TString& cluster, const TBackupSettings& settings) = 0; + virtual NThreading::TFuture<TGenericResult> CreateUser(const TString& cluster, const TCreateUserSettings& settings) = 0; virtual NThreading::TFuture<TGenericResult> AlterUser(const TString& cluster, const TAlterUserSettings& settings) = 0; diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp index 24695052ff..bce0f05d5d 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp @@ -77,6 +77,7 @@ struct TKikimrData { DataSinkNames.insert(TKiCreateBackupCollection::CallableName()); DataSinkNames.insert(TKiAlterBackupCollection::CallableName()); DataSinkNames.insert(TKiDropBackupCollection::CallableName()); + DataSinkNames.insert(TKiBackup::CallableName()); CommitModes.insert(CommitModeFlush); CommitModes.insert(CommitModeRollback); @@ -126,7 +127,8 @@ struct TKikimrData { TYdbOperation::ModifyPermission | TYdbOperation::CreateBackupCollection | TYdbOperation::AlterBackupCollection | - TYdbOperation::DropBackupCollection; + TYdbOperation::DropBackupCollection | + TYdbOperation::Backup; SystemColumns = { {"_yql_partition_id", NKikimr::NUdf::EDataSlot::Uint64} @@ -438,7 +440,7 @@ bool TKikimrKey::Extract(const TExprNode& key) { KeyType = Type::PGObject; Target = key.Child(0)->Child(1)->Child(0)->Content(); ObjectType = key.Child(0)->Child(2)->Child(0)->Content(); - } else if (tagName == "backupCollection") { + } else if (tagName == "backupCollection" || tagName == "backup") { KeyType = Type::BackupCollection; Target = key.Child(0)->Child(1)->Child(0)->Content(); ExplicitPrefix = key.Child(0)->Child(2)->Child(0)->Content(); diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index c4b734d93c..7629f08ba3 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -217,37 +217,38 @@ private: }; enum class TYdbOperation : ui32 { - CreateTable = 1 << 0, - DropTable = 1 << 1, - AlterTable = 1 << 2, - Select = 1 << 3, - Upsert = 1 << 4, - Replace = 1 << 5, - Update = 1 << 6, - Delete = 1 << 7, - InsertRevert = 1 << 8, - InsertAbort = 1 << 9, - ReservedInsertIgnore = 1 << 10, - UpdateOn = 1 << 11, - DeleteOn = 1 << 12, - CreateUser = 1 << 13, - AlterUser = 1 << 14, - DropUser = 1 << 15, - CreateGroup = 1 << 16, - AlterGroup = 1 << 17, - DropGroup = 1 << 18, - CreateTopic = 1 << 19, - AlterTopic = 1 << 20, - DropTopic = 1 << 21, - ModifyPermission = 1 << 22, - RenameGroup = 1 << 23, - CreateReplication = 1 << 24, - AlterReplication = 1 << 25, - DropReplication = 1 << 26, - Analyze = 1 << 27, - CreateBackupCollection = 1 << 28, - AlterBackupCollection = 1 << 29, - DropBackupCollection = 1 << 30, + CreateTable = 1ull << 0, + DropTable = 1ull << 1, + AlterTable = 1ull << 2, + Select = 1ull << 3, + Upsert = 1ull << 4, + Replace = 1ull << 5, + Update = 1ull << 6, + Delete = 1ull << 7, + InsertRevert = 1ull << 8, + InsertAbort = 1ull << 9, + ReservedInsertIgnore = 1ull << 10, + UpdateOn = 1ull << 11, + DeleteOn = 1ull << 12, + CreateUser = 1ull << 13, + AlterUser = 1ull << 14, + DropUser = 1ull << 15, + CreateGroup = 1ull << 16, + AlterGroup = 1ull << 17, + DropGroup = 1ull << 18, + CreateTopic = 1ull << 19, + AlterTopic = 1ull << 20, + DropTopic = 1ull << 21, + ModifyPermission = 1ull << 22, + RenameGroup = 1ull << 23, + CreateReplication = 1ull << 24, + AlterReplication = 1ull << 25, + DropReplication = 1ull << 26, + Analyze = 1ull << 27, + CreateBackupCollection = 1ull << 28, + AlterBackupCollection = 1ull << 29, + DropBackupCollection = 1ull << 30, + Backup = 1ull << 31, }; Y_DECLARE_FLAGS(TYdbOperations, TYdbOperation); diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index 099a90eb3f..cf926a23a1 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -79,6 +79,7 @@ private: virtual TStatus HandleCreateBackupCollection(NNodes::TKiCreateBackupCollection node, TExprContext& ctx) = 0; virtual TStatus HandleAlterBackupCollection(NNodes::TKiAlterBackupCollection node, TExprContext& ctx) = 0; virtual TStatus HandleDropBackupCollection(NNodes::TKiDropBackupCollection node, TExprContext& ctx) = 0; + virtual TStatus HandleBackup(NNodes::TKiBackup node, TExprContext& ctx) = 0; }; class TKikimrKey { diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index da26a6afa2..ab8b8859b8 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -2180,6 +2180,11 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over return TStatus::Ok; } + TStatus HandleBackup(TKiBackup node, TExprContext&) override { + node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn()); + return TStatus::Ok; + } + private: TIntrusivePtr<IKikimrGateway> Gateway; TIntrusivePtr<TKikimrSessionContext> SessionCtx; diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index e04b7bbff6..1ddfa0ccdb 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -318,6 +318,7 @@ message TTableDescription { // on table creation to allow system column names (started with __ydb_) // It won't be present on describes and won't be preserved optional bool SystemColumnNamesAllowed = 42; + optional bool AllowUnderSameOperation = 44 [default = false]; } message TDictionaryEncodingSettings { @@ -1070,6 +1071,7 @@ message TCopyTableConfig { //TTableDescription implemets copying a table in orig // additionally creates cdc stream on src table consistently with taking snapshot optional TCreateCdcStream CreateSrcCdcStream = 6; + optional bool AllowUnderSameOperation = 7 [default = false]; } message TConsistentTableCopyingConfig { @@ -1617,6 +1619,7 @@ enum EOperationType { ESchemeOpCreateBackupCollection = 105; ESchemeOpAlterBackupCollection = 106; ESchemeOpDropBackupCollection = 107; + ESchemeOpBackupBackupCollection = 109; // Move sequence ESchemeOpMoveSequence = 108; @@ -1807,6 +1810,7 @@ message TModifyScheme { optional TBackupCollectionDescription CreateBackupCollection = 74; optional TBackupCollectionDescription AlterBackupCollection = 75; optional TBackupCollectionDescription DropBackupCollection = 76; + optional TBackupBackupCollection BackupBackupCollection = 78; optional TMove MoveSequence = 77; } @@ -2199,3 +2203,9 @@ message TBackupCollectionDescription { google.protobuf.Empty Cluster = 7; } } + +message TBackupBackupCollection { + optional string Name = 1; + optional NKikimrProto.TPathID PathId = 2; + optional string TargetDir = 3; // must be set on Rewrite +} diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index dede8af2c6..134618fa9f 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -478,6 +478,7 @@ message TKqpSchemeOperation { NKikimrSchemeOp.TModifyScheme CreateBackupCollection = 45; NKikimrSchemeOp.TModifyScheme AlterBackupCollection = 46; NKikimrSchemeOp.TModifyScheme DropBackupCollection = 47; + NKikimrSchemeOp.TModifyScheme Backup = 48; } } diff --git a/ydb/core/tx/schemeshard/schemeshard__op_traits.h b/ydb/core/tx/schemeshard/schemeshard__op_traits.h index 49f6777d09..10b463bb69 100644 --- a/ydb/core/tx/schemeshard/schemeshard__op_traits.h +++ b/ydb/core/tx/schemeshard/schemeshard__op_traits.h @@ -1,5 +1,9 @@ #pragma once +#include "schemeshard_path.h" +#include "schemeshard_impl.h" +#include "schemeshard__operation.h" + #include <ydb/core/protos/flat_scheme_op.pb.h> #include <util/generic/string.h> @@ -22,14 +26,19 @@ struct TSchemeTxTraitsFallback { return false; } - static THashMap<TString, THashSet<TString>> GetRequiredPaths(const TTxTransaction& tx) { - Y_UNUSED(tx); + static std::optional<THashMap<TString, THashSet<TString>>> GetRequiredPaths(const TTxTransaction& tx, const TOperationContext& context) { + Y_UNUSED(tx, context); return {}; } - constexpr inline static bool CreateDirsFromName = false; + static bool Rewrite(TTxTransaction& tx) { + Y_UNUSED(tx); + return false; + } + constexpr inline static bool CreateDirsFromName = false; constexpr inline static bool CreateAdditionalDirs = false; + constexpr inline static bool NeedRewrite = false; }; template <NKikimrSchemeOp::EOperationType opType> @@ -290,4 +299,67 @@ struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateBackupCol constexpr inline static bool CreateDirsFromName = true; }; +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection> : public TSchemeTxTraitsFallback { + static std::optional<THashMap<TString, THashSet<TString>>> GetRequiredPaths(const TTxTransaction& tx, const TOperationContext& context) { + THashMap<TString, THashSet<TString>> paths; + + const auto& backupOp = tx.GetBackupBackupCollection(); + + const auto& targetDir = backupOp.GetTargetDir(); + const TString& targetPath = JoinPath({tx.GetWorkingDir(), tx.GetBackupBackupCollection().GetName()}); + + const TPath& bcPath = TPath::Resolve(targetPath, context.SS); + { + auto checks = bcPath.Check(); + checks + .NotEmpty() + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotUnderDeleting() + .NotUnderOperation() + .IsBackupCollection(); + + if (!checks) { + return {}; + } + } + + Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId)); + const auto& bc = context.SS->BackupCollections[bcPath->PathId]; + + auto& collectionPaths = paths[targetPath]; + + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + std::pair<TString, TString> paths; + TString err; + if (!TrySplitPathByDb(item.GetPath(), tx.GetWorkingDir(), paths, err)) { + return {}; + } + + auto pathPieces = SplitPath(paths.second); + if (pathPieces.size() > 1) { + auto parent = ExtractParent(paths.second); + collectionPaths.emplace(JoinPath({targetDir, TString(parent)})); + } + } + + return paths; + } + + static bool Rewrite(TTxTransaction& tx) { + auto now = ToX509String(TlsActivationContext->AsActorContext().Now()); + tx.MutableBackupBackupCollection()->SetTargetDir(now + "_full"); + return true; + } + + constexpr inline static bool CreateAdditionalDirs = true; + constexpr inline static bool NeedRewrite = true; +private: + static inline TString ToX509String(const TInstant& datetime) { + return datetime.FormatLocalTime("%Y%m%d%H%M%SZ"); + } +}; + } // namespace NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 2cd9a53027..2d7c65dff3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -176,6 +176,7 @@ bool TSchemeShard::ProcessOperationParts( } THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request, TOperationContext& context) { + using namespace NGenerated; THolder<TProposeResponse> response = nullptr; auto selfId = SelfTabletId(); @@ -211,13 +212,33 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request } } + // + + TVector<TTxTransaction> rewrittenTransactions; + + // # Phase Zero + // Rewrites transactions. + // It may fill or clear particular fields based on some runtime SS state. + + for (auto tx : record.GetTransaction()) { + if (DispatchOp(tx, [&](auto traits) { return traits.NeedRewrite && !traits.Rewrite(tx); })) { + response.Reset(new TProposeResponse(NKikimrScheme::StatusPreconditionFailed, ui64(txId), ui64(selfId))); + response->SetError(NKikimrScheme::StatusPreconditionFailed, "Invalid schema rewrite rule."); + return std::move(response); + } + + rewrittenTransactions.push_back(std::move(tx)); + } + + // + TVector<TTxTransaction> transactions; TVector<TTxTransaction> generatedTransactions; // # Phase One // Generate MkDir transactions based on object name. - for (const auto& transaction : record.GetTransaction()) { + for (const auto& transaction : rewrittenTransactions) { auto splitResult = operation->SplitIntoTransactions(transaction, context); if (splitResult.Status != NKikimrScheme::StatusSuccess) { response.Reset(new TProposeResponse(splitResult.Status, ui64(txId), ui64(selfId))); @@ -261,6 +282,8 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request } } + // + return std::move(response); } @@ -730,7 +753,7 @@ NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxOperationReply(TOperati TString JoinPath(const TString& workingDir, const TString& name) { - Y_ABORT_UNLESS(!name.StartsWith('/') && !name.EndsWith('/')); + Y_ABORT_UNLESS(!name.StartsWith('/') && !name.EndsWith('/'), "%s", name.c_str()); return TStringBuilder() << workingDir << (workingDir.EndsWith('/') ? "" : "/") @@ -957,29 +980,31 @@ TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTx // # Generates MkDirs based on transaction-specific requirements if (DispatchOp(tx, [&](auto traits) { return traits.CreateAdditionalDirs; })) { - for (const auto& [parentPathStr, pathStrs] : DispatchOp(tx, [&](auto traits) { return traits.GetRequiredPaths(tx); })) { - const TPath parentPath = TPath::Resolve(parentPathStr, context.SS); - { - TPath::TChecker checks = parentPath.Check(); - checks - .NotUnderDomainUpgrade() - .IsAtLocalSchemeShard() - .IsResolved() - .NotDeleted() - .NotUnderDeleting() - .IsCommonSensePath() - .IsLikeDirectory(); - - if (!checks) { - result.Transaction = tx; - return result; + if (auto requiredPaths = DispatchOp(tx, [&](auto traits) { return traits.GetRequiredPaths(tx, context); }); requiredPaths) { + for (const auto& [parentPathStr, pathStrs] : *requiredPaths) { + const TPath parentPath = TPath::Resolve(parentPathStr, context.SS); + { + TPath::TChecker checks = parentPath.Check(); + checks + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsCommonSensePath() + .IsLikeDirectory(); + + if (!checks) { + result.Transaction = tx; + return result; + } } - } - for (const auto& pathStr : pathStrs) { - TPath path = TPath::Resolve(JoinPath(parentPathStr, pathStr), context.SS); - if (!CreateDirs(tx, parentPath, path, createdPaths, result)) { - return result; + for (const auto& pathStr : pathStrs) { + TPath path = TPath::Resolve(JoinPath(parentPathStr, pathStr), context.SS); + if (!CreateDirs(tx, parentPath, path, createdPaths, result)) { + return result; + } } } } @@ -1483,6 +1508,8 @@ TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx case NKikimrSchemeOp::EOperationType::ESchemeOpDropBackupCollection: return {CreateDropBackupCollection(NextPartId(), tx)}; + case NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection: + return CreateBackupBackupCollection(NextPartId(), tx, context); } Y_UNREACHABLE(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp new file mode 100644 index 0000000000..77e25089ea --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp @@ -0,0 +1,110 @@ +#include "schemeshard__operation_common.h" +#include "schemeshard__operation_create_cdc_stream.h" +#include "schemeshard_impl.h" + +#include <ydb/core/tx/schemeshard/backup/constants.h> + +namespace NKikimr::NSchemeShard { + +TVector<ISubOperation::TPtr> CreateBackupBackupCollection(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) { + TVector<ISubOperation::TPtr> result; + + NKikimrSchemeOp::TModifyScheme modifyScheme; + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables); + modifyScheme.SetInternal(true); + + auto& cct = *modifyScheme.MutableCreateConsistentCopyTables(); + auto& copyTables = *cct.MutableCopyTableDescriptions(); + const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS); + + TString bcPathStr = JoinPath({tx.GetWorkingDir(), tx.GetBackupBackupCollection().GetName()}); + + const TPath& bcPath = TPath::Resolve(bcPathStr, context.SS); + { + auto checks = bcPath.Check(); + checks + .NotEmpty() + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotUnderDeleting() + .NotUnderOperation() + .IsBackupCollection(); + + if (!checks) { + result = {CreateReject(opId, checks.GetStatus(), checks.GetError())}; + return result; + } + } + + Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId)); + const auto& bc = context.SS->BackupCollections[bcPath->PathId]; + bool incrBackupEnabled = bc->Description.HasIncrementalBackupConfig(); + + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + auto& desc = *copyTables.Add(); + desc.SetSrcPath(item.GetPath()); + std::pair<TString, TString> paths; + TString err; + if (!TrySplitPathByDb(item.GetPath(), bcPath.GetDomainPathString(), paths, err)) { + result = {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, err)}; + return {}; + } + auto& relativeItemPath = paths.second; + desc.SetDstPath(JoinPath({tx.GetWorkingDir(), tx.GetBackupBackupCollection().GetName(), tx.GetBackupBackupCollection().GetTargetDir(), relativeItemPath})); + desc.SetOmitIndexes(true); + desc.SetOmitFollowers(true); + desc.SetAllowUnderSameOperation(true); + + if (incrBackupEnabled) { + NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; + createCdcStreamOp.SetTableName(item.GetPath()); + auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); + streamDescription.SetName(NBackup::CB_CDC_STREAM_NAME); + streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); + streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); + + const auto sPath = TPath::Resolve(item.GetPath(), context.SS); + NCdc::DoCreateStreamImpl(result, createCdcStreamOp, opId, sPath, false, false); + + desc.MutableCreateSrcCdcStream()->CopyFrom(createCdcStreamOp); + } + } + + if (!CreateConsistentCopyTables(opId, modifyScheme, context, result)) { + return result; + } + + if (incrBackupEnabled) { + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp; + createCdcStreamOp.SetTableName(item.GetPath()); + auto& streamDescription = *createCdcStreamOp.MutableStreamDescription(); + streamDescription.SetName(NBackup::CB_CDC_STREAM_NAME); + streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate); + streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto); + + const auto sPath = TPath::Resolve(item.GetPath(), context.SS); + auto table = context.SS->Tables.at(sPath.Base()->PathId); + + TVector<TString> boundaries; + const auto& partitions = table->GetPartitions(); + boundaries.reserve(partitions.size() - 1); + + for (ui32 i = 0; i < partitions.size(); ++i) { + const auto& partition = partitions.at(i); + if (i != partitions.size() - 1) { + boundaries.push_back(partition.EndOfRange); + } + } + + const auto streamPath = sPath.Child(NBackup::CB_CDC_STREAM_NAME); + + NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, NBackup::CB_CDC_STREAM_NAME, table, boundaries, false); + } + } + + return result; +} + +} // namespace NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp index c9b3a1ab0c..a50c0e82ea 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp @@ -9,7 +9,7 @@ #include <util/generic/algorithm.h> -NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst, bool omitFollowers, bool isBackup) { +NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst, bool omitFollowers, bool isBackup, bool allowUnderSameOp) { using namespace NKikimr::NSchemeShard; auto scheme = TransactionTemplate(dst.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable); @@ -20,6 +20,7 @@ NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, operation->SetCopyFromTable(src.PathString()); operation->SetOmitFollowers(omitFollowers); operation->SetIsBackup(isBackup); + operation->SetAllowUnderSameOperation(allowUnderSameOp); return scheme; } @@ -47,14 +48,20 @@ NKikimrSchemeOp::TModifyScheme CreateIndexTask(NKikimr::NSchemeShard::TTableInde namespace NKikimr::NSchemeShard { -TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, const TTxTransaction& tx, TOperationContext& context) { +bool CreateConsistentCopyTables( + TOperationId nextId, + const TTxTransaction& tx, + TOperationContext& context, + TVector<ISubOperation::TPtr>& result) +{ Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpCreateConsistentCopyTables); const auto& op = tx.GetCreateConsistentCopyTables(); if (0 == op.CopyTableDescriptionsSize()) { TString msg = TStringBuilder() << "no task to do, empty list CopyTableDescriptions"; - return {CreateReject(nextId, NKikimrScheme::EStatus::StatusInvalidParameter, msg)}; + result = {CreateReject(nextId, NKikimrScheme::EStatus::StatusInvalidParameter, msg)}; + return false; } TPath firstPath = TPath::Resolve(op.GetCopyTableDescriptions(0).GetSrcPath(), context.SS); @@ -66,7 +73,8 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con .IsAtLocalSchemeShard(); if (!checks) { - return {CreateReject(nextId, checks.GetStatus(), checks.GetError())}; + result = {CreateReject(nextId, checks.GetStatus(), checks.GetError())}; + return false; } } @@ -80,20 +88,20 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con : limits.MaxConsistentCopyTargets; if (op.CopyTableDescriptionsSize() > limit) { - return {CreateReject(nextId, NKikimrScheme::EStatus::StatusInvalidParameter, TStringBuilder() + result = {CreateReject(nextId, NKikimrScheme::EStatus::StatusInvalidParameter, TStringBuilder() << "Consistent copy object count limit exceeded" << ", limit: " << limit << ", objects: " << op.CopyTableDescriptionsSize() )}; + return false; } TString errStr; if (!context.SS->CheckApplyIf(tx, errStr)) { - return {CreateReject(nextId, NKikimrScheme::EStatus::StatusPreconditionFailed, errStr)}; + result = {CreateReject(nextId, NKikimrScheme::EStatus::StatusPreconditionFailed, errStr)}; + return false; } - TVector<ISubOperation::TPtr> result; - for (const auto& descr: op.GetCopyTableDescriptions()) { const auto& srcStr = descr.GetSrcPath(); const auto& dstStr = descr.GetDstPath(); @@ -108,7 +116,8 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con .IsTheSameDomain(firstPath); if (!checks) { - return {CreateReject(nextId, checks.GetStatus(), checks.GetError())}; + result = {CreateReject(nextId, checks.GetStatus(), checks.GetError())}; + return false; } } @@ -135,7 +144,7 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con } result.push_back(CreateCopyTable(NextPartId(nextId, result), - CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup()), sequences)); + CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup(), descr.GetAllowUnderSameOperation()), sequences)); TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions; for (const auto& child: srcPath.Base()->GetChildren()) { @@ -176,7 +185,7 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con TPath dstImplTable = dstIndexPath.Child(srcImplTableName); result.push_back(CreateCopyTable(NextPartId(nextId, result), - CopyTableTask(srcImplTable, dstImplTable, descr.GetOmitFollowers(), descr.GetIsBackup()))); + CopyTableTask(srcImplTable, dstImplTable, descr.GetOmitFollowers(), descr.GetIsBackup(), descr.GetAllowUnderSameOperation()))); } for (auto&& sequenceDescription : sequenceDescriptions) { @@ -193,6 +202,14 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con } } + return true; +} + +TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, const TTxTransaction& tx, TOperationContext& context) { + TVector<ISubOperation::TPtr> result; + + CreateConsistentCopyTables(nextId, tx, context, result); + return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index cadf213e63..41e9a5570a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -401,9 +401,12 @@ public: .IsUnderTheSameOperation(OperationId.GetTxId()); //allow only as part of copying base table } else { checks - .NotUnderOperation() .IsCommonSensePath() .IsLikeDirectory(); + + if (!Transaction.GetCreateTable().GetAllowUnderSameOperation()) { + checks.NotUnderOperation(); + } } } @@ -775,9 +778,12 @@ TVector<ISubOperation::TPtr> CreateCopyTable(TOperationId nextId, const TTxTrans .NotDeleted() .NotUnderDeleting() .IsTable() - .NotUnderOperation() .IsCommonSensePath(); //forbid copy impl index tables directly + if (!copying.GetAllowUnderSameOperation()) { + checks.NotUnderOperation(); + } + if (!checks) { return {CreateReject(nextId, checks.GetStatus(), checks.GetError())}; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 39edfcb062..7c9b8b67c3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -712,6 +712,19 @@ static void FillModifySchemaForCdc( } } +void DoCreateStreamImpl( + TVector<ISubOperation::TPtr>& result, + const NKikimrSchemeOp::TCreateCdcStream& op, + const TOperationId& opId, + const TPath& tablePath, + const bool acceptExisted, + const bool initialScan) +{ + auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl); + FillModifySchemaForCdc(outTx, op, opId, acceptExisted, initialScan); + result.push_back(CreateNewCdcStreamImpl(NextPartId(opId, result), outTx)); +} + void DoCreateStream( TVector<ISubOperation::TPtr>& result, const NKikimrSchemeOp::TCreateCdcStream& op, @@ -721,11 +734,8 @@ void DoCreateStream( const bool acceptExisted, const bool initialScan) { - { - auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl); - FillModifySchemaForCdc(outTx, op, opId, acceptExisted, initialScan); - result.push_back(CreateNewCdcStreamImpl(NextPartId(opId, result), outTx)); - } + DoCreateStreamImpl(result, op, opId, tablePath, acceptExisted, initialScan); + { auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable); FillModifySchemaForCdc(outTx, op, opId, acceptExisted, initialScan); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h index 1aa03529be..4ba199de6c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h @@ -22,6 +22,14 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoNewStreamPathChecks( bool acceptExisted, bool restore = false); +void DoCreateStreamImpl( + TVector<ISubOperation::TPtr>& result, + const NKikimrSchemeOp::TCreateCdcStream& op, + const TOperationId& opId, + const TPath& tablePath, + const bool acceptExisted, + const bool initialScan); + void DoCreateStream( TVector<ISubOperation::TPtr>& result, const NKikimrSchemeOp::TCreateCdcStream& op, diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index b784ef579b..4b5cfe312e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -459,6 +459,11 @@ ISubOperation::TPtr CreateDropTableIndex(TOperationId id, TTxState::ETxState sta ISubOperation::TPtr CreateAlterTableIndex(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateAlterTableIndex(TOperationId id, TTxState::ETxState state); +bool CreateConsistentCopyTables( + TOperationId nextId, + const TTxTransaction& tx, + TOperationContext& context, + TVector<ISubOperation::TPtr>& result); TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, const TTxTransaction& tx, TOperationContext& context); ISubOperation::TPtr CreateNewOlapStore(TOperationId id, const TTxTransaction& tx); @@ -655,6 +660,7 @@ ISubOperation::TPtr CreateNewBackupCollection(TOperationId id, TTxState::ETxStat ISubOperation::TPtr CreateDropBackupCollection(TOperationId id, const TTxTransaction& tx); ISubOperation::TPtr CreateDropBackupCollection(TOperationId id, TTxState::ETxState state); +TVector<ISubOperation::TPtr> CreateBackupBackupCollection(TOperationId opId, const TTxTransaction& tx, TOperationContext& context); } } diff --git a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp index 5a4cd7fe80..eb091e78d9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp @@ -255,6 +255,9 @@ TString DefineUserOperationName(const NKikimrSchemeOp::TModifyScheme& tx) { return "ALTER BACKUP COLLECTION"; case NKikimrSchemeOp::EOperationType::ESchemeOpDropBackupCollection: return "DROP BACKUP COLLECTION"; + + case NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection: + return "BACKUP"; } Y_ABORT("switch should cover all operation types"); } @@ -576,6 +579,10 @@ TVector<TString> ExtractChangingPaths(const NKikimrSchemeOp::TModifyScheme& tx) case NKikimrSchemeOp::EOperationType::ESchemeOpDropBackupCollection: result.emplace_back(NKikimr::JoinPath({tx.GetWorkingDir(), tx.GetDropBackupCollection().GetName()})); break; + + case NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection: + result.emplace_back(NKikimr::JoinPath({tx.GetWorkingDir(), tx.GetBackupBackupCollection().GetName()})); + break; } return result; diff --git a/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp b/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp index fd6d3e7b9d..bb772d5236 100644 --- a/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp @@ -288,4 +288,58 @@ Y_UNIT_TEST_SUITE(TBackupCollectionTests) { )"); env.TestWaitNotification(runtime, txId); } + + Y_UNIT_TEST(BackupAbsentCollection) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + + PrepareDirs(runtime, env, txId); + + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")", + {NKikimrScheme::EStatus::StatusPathDoesNotExist}); + env.TestWaitNotification(runtime, txId); + } + + Y_UNIT_TEST(BackupDroppedCollection) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + + PrepareDirs(runtime, env, txId); + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections", DefaultCollectionSettings()); + env.TestWaitNotification(runtime, txId); + + TestDropBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections", "Name: \"" DEFAULT_NAME_1 "\""); + env.TestWaitNotification(runtime, txId); + + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")", + {NKikimrScheme::EStatus::StatusPathDoesNotExist}); + env.TestWaitNotification(runtime, txId); + } + + Y_UNIT_TEST(BackupAbsentDirs) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + + PrepareDirs(runtime, env, txId); + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections", DefaultCollectionSettings()); + env.TestWaitNotification(runtime, txId); + + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")", + {NKikimrScheme::EStatus::StatusPathDoesNotExist}); + env.TestWaitNotification(runtime, txId); + } } // TBackupCollectionTests diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp index 5228885af3..170d5f3c29 100644 --- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp @@ -11376,4 +11376,138 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { TestCopyTable(runtime, ++txId, "/MyRoot", "SystemColumnInCopyAllowed", "/MyRoot/SystemColumnAllowed"); } + + Y_UNIT_TEST_FLAG(BackupBackupCollection, WithIncremental) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + auto defaultCollectionSettings = []() { + return TString(R"( + Name: "MyCollection1" + ExplicitEntryList { + Entries { + Type: ETypeTable + Path: "/MyRoot/Table1" + } + Entries { + Type: ETypeTable + Path: "/MyRoot/DirA/Table2" + } + Entries { + Type: ETypeTable + Path: "/MyRoot/DirA/DirB/Table3" + } + } + Cluster {} + )") + (WithIncremental ? TString("IncrementalBackupConfig {} \n") : TString()); + }; + + AsyncMkDir(runtime, ++txId, "/MyRoot", "DirA"); + AsyncMkDir(runtime, ++txId, "/MyRoot/DirA", "DirB"); + AsyncCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table1" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value0" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + AsyncCreateIndexedTable(runtime, ++txId, "/MyRoot/DirA", R"( + TableDescription { + Name: "Table2" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value0" Type: "Utf8" } + Columns { Name: "value1" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "UserDefinedIndexByValue0" + KeyColumnNames: ["value0"] + } + IndexDescription { + Name: "UserDefinedIndexByValues" + KeyColumnNames: ["value0", "value1"] + } + IndexDescription { + Name: "UserDefinedIndexByValue0CoveringValue1" + KeyColumnNames: ["value0"] + DataColumnNames: ["value1"] + } + )"); + AsyncCreateTable(runtime, ++txId, "/MyRoot/DirA/DirB", R"( + Name: "Table3" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value0" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + + TestModificationResult(runtime, txId - 4, NKikimrScheme::StatusAccepted); + TestModificationResult(runtime, txId - 3, NKikimrScheme::StatusAccepted); + TestModificationResult(runtime, txId - 2, NKikimrScheme::StatusAccepted); + TestModificationResult(runtime, txId - 1, NKikimrScheme::StatusAccepted); + TestModificationResult(runtime, txId - 0, NKikimrScheme::StatusAccepted); + + env.TestWaitNotification(runtime, {txId, txId - 1 , txId - 2, txId - 3, txId - 4}); + + TestMkDir(runtime, ++txId, "/MyRoot", ".backups/collections"); + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections", defaultCollectionSettings()); + + env.TestWaitNotification(runtime, {txId, txId - 1}); + + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", R"( + Name: ".backups/collections/MyCollection1" + )"); + + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/.backups/collections/MyCollection1"), { + NLs::PathExist, + NLs::IsBackupCollection, + NLs::ChildrenCount(1), + NLs::Finished, + }); + + auto descr = DescribePath(runtime, "/MyRoot/.backups/collections/MyCollection1").GetPathDescription(); + UNIT_ASSERT_VALUES_EQUAL(descr.GetChildren().size(), 1); + + auto backupDirName = descr.GetChildren(0).GetName().c_str(); + + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s", backupDirName)), { + NLs::PathExist, + NLs::ChildrenCount(2), + NLs::Finished, + }); + + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/Table1", backupDirName)), { + NLs::PathExist, + NLs::IsTable, + NLs::Finished, + }); + + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/DirA", backupDirName)), { + NLs::PathExist, + NLs::ChildrenCount(2), + NLs::Finished, + }); + + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/DirA/Table2", backupDirName)), { + NLs::PathExist, + NLs::IsTable, + NLs::Finished, + }); + + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/DirA/DirB", backupDirName)), { + NLs::PathExist, + NLs::ChildrenCount(1), + NLs::Finished, + }); + + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/DirA/DirB/Table3", backupDirName)), { + NLs::PathExist, + NLs::IsTable, + NLs::Finished, + }); + + // TODO: validate no index created + // TODO: validate no stream created + } } diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index c525a0f677..1000d3f109 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -924,6 +924,7 @@ namespace NSchemeShardUT_Private { GENERIC_HELPERS(CreateBackupCollection, NKikimrSchemeOp::EOperationType::ESchemeOpCreateBackupCollection, &NKikimrSchemeOp::TModifyScheme::MutableCreateBackupCollection) GENERIC_HELPERS(DropBackupCollection, NKikimrSchemeOp::EOperationType::ESchemeOpDropBackupCollection, &NKikimrSchemeOp::TModifyScheme::MutableDropBackupCollection) DROP_BY_PATH_ID_HELPERS(DropBackupCollection, NKikimrSchemeOp::EOperationType::ESchemeOpDropBackupCollection) + GENERIC_HELPERS(BackupBackupCollection, NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection, &NKikimrSchemeOp::TModifyScheme::MutableBackupBackupCollection) #undef DROP_BY_PATH_ID_HELPERS #undef GENERIC_WITH_ATTRS_HELPERS diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index 2378346fbd..f462cf9872 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -299,6 +299,7 @@ namespace NSchemeShardUT_Private { GENERIC_HELPERS(CreateBackupCollection); GENERIC_HELPERS(DropBackupCollection); DROP_BY_PATH_ID_HELPERS(DropBackupCollection); + GENERIC_HELPERS(BackupBackupCollection); #undef DROP_BY_PATH_ID_HELPERS #undef GENERIC_WITH_ATTRS_HELPERS diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 337d4ea650..7b44b323a3 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -61,10 +61,10 @@ SRCS( operation_queue_timer.h schemeshard.cpp schemeshard__background_cleaning.cpp + schemeshard__background_compaction.cpp schemeshard__backup_collection_common.cpp schemeshard__borrowed_compaction.cpp schemeshard__clean_pathes.cpp - schemeshard__background_compaction.cpp schemeshard__conditional_erase.cpp schemeshard__delete_tablet_reply.cpp schemeshard__describe_scheme.cpp @@ -100,19 +100,20 @@ SRCS( schemeshard__operation_alter_user_attrs.cpp schemeshard__operation_apply_build_index.cpp schemeshard__operation_assign_bsv.cpp + schemeshard__operation_backup_backup_collection.cpp schemeshard__operation_blob_depot.cpp schemeshard__operation_cancel_tx.cpp schemeshard__operation_cansel_build_index.cpp - schemeshard__operation_common.h schemeshard__operation_common.cpp - schemeshard__operation_common_pq.cpp + schemeshard__operation_common.h schemeshard__operation_common_bsv.cpp schemeshard__operation_common_cdc_stream.cpp schemeshard__operation_common_external_data_source.cpp schemeshard__operation_common_external_table.cpp + schemeshard__operation_common_pq.cpp schemeshard__operation_common_resource_pool.cpp - schemeshard__operation_common_subdomain.h schemeshard__operation_common_subdomain.cpp + schemeshard__operation_common_subdomain.h schemeshard__operation_consistent_copy_tables.cpp schemeshard__operation_copy_sequence.cpp schemeshard__operation_copy_table.cpp diff --git a/ydb/core/tx/tx_proxy/schemereq.cpp b/ydb/core/tx/tx_proxy/schemereq.cpp index b05f88f91d..12fe027d2d 100644 --- a/ydb/core/tx/tx_proxy/schemereq.cpp +++ b/ydb/core/tx/tx_proxy/schemereq.cpp @@ -374,6 +374,9 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { case NKikimrSchemeOp::ESchemeOpDropBackupCollection: return *modifyScheme.MutableDropBackupCollection()->MutableName(); + + case NKikimrSchemeOp::ESchemeOpBackupBackupCollection: + return *modifyScheme.MutableBackupBackupCollection()->MutableName(); } } @@ -749,6 +752,15 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { } break; } + case NKikimrSchemeOp::ESchemeOpBackupBackupCollection: { + auto toResolve = TPathToResolve(pbModifyScheme.GetOperationType()); + toResolve.Path = workingDir; + auto collectionPath = SplitPath(pbModifyScheme.GetBackupBackupCollection().GetName()); + std::move(collectionPath.begin(), collectionPath.end(), std::back_inserter(toResolve.Path)); + toResolve.RequiredAccess = NACLib::EAccessRights::GenericWrite; + ResolveForACL.push_back(toResolve); + break; + } case NKikimrSchemeOp::ESchemeOpMoveTable: { auto& descr = pbModifyScheme.GetMoveTable(); { |