diff options
author | Innokentii Mokin <innokentii@ydb.tech> | 2024-11-25 18:13:50 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-25 18:13:50 +0300 |
commit | 378c678406437e888668d59a0b2a90059d23da01 (patch) | |
tree | 1da60c68d9749b0ee300987fdb5d92d0749b0bb3 | |
parent | 1dd2b997111cdadcb91ad57b6ea05bd62ce15902 (diff) | |
download | ydb-378c678406437e888668d59a0b2a90059d23da01.tar.gz |
Add backup incremental backup collection (#11870)
29 files changed, 422 insertions, 56 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index 40f293a7ad..ffea9daaf3 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -379,6 +379,12 @@ public: break; } + case NKqpProto::TKqpSchemeOperation::kBackupIncremental: { + const auto& modifyScheme = schemeOp.GetBackupIncremental(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + default: InternalError(TStringBuilder() << "Unexpected scheme operation: " << (ui32) schemeOp.GetOperationCase()); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index ebffe7a298..97d269cf0d 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -1335,6 +1335,10 @@ public: return NotImplemented<TGenericResult>(); } + TFuture<TGenericResult> BackupIncremental(const TString&, const NYql::TBackupSettings&) override { + return NotImplemented<TGenericResult>(); + } + TFuture<TGenericResult> CreateUser(const TString& cluster, const NYql::TCreateUserSettings& settings) override { using TRequest = TEvTxUserProxy::TEvProposeTransaction; diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 65d284c5b8..813437d92d 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -1401,6 +1401,49 @@ public: } } + TFuture<TGenericResult> BackupIncremental(const TString& cluster, const NYql::TBackupSettings& settings) override { + CHECK_PREPARED_DDL(BackupIncremental); + + try { + if (cluster != SessionCtx->GetCluster()) { + return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster)); + } + + std::pair<TString, TString> pathPair; + if (settings.Name.StartsWith("/")) { + TString error; + if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, true)) { + return MakeFuture(ResultFromError<TGenericResult>(error)); + } + } else { + pathPair.second = ".backups/collections/" + settings.Name; + } + + NKikimrSchemeOp::TModifyScheme tx; + tx.SetWorkingDir(GetDatabase()); + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpBackupIncrementalBackupCollection); + + auto& op = *tx.MutableBackupIncrementalBackupCollection(); + op.SetName(pathPair.second); + + if (IsPrepare()) { + auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery(); + auto& phyTx = *phyQuery.AddTransactions(); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + phyTx.MutableSchemeOperation()->MutableBackupIncremental()->Swap(&tx); + + TGenericResult result; + result.SetSuccess(); + return MakeFuture(result); + } else { + return Gateway->ModifyScheme(std::move(tx)); + } + } + catch (yexception& e) { + return MakeFuture(ResultFromException<TGenericResult>(e)); + } + } + TFuture<TGenericResult> CreateUser(const TString& cluster, const TCreateUserSettings& settings) override { CHECK_PREPARED_DDL(CreateUser); diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp index afa1d0cf86..dcb1e40b7a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp @@ -173,6 +173,12 @@ private: return TStatus::Ok; } + TStatus HandleBackupIncremental(TKiBackupIncremental node, TExprContext& ctx) override { + Y_UNUSED(ctx); + Y_UNUSED(node); + return TStatus::Ok; + } + TStatus HandleCreateUser(TKiCreateUser node, TExprContext& ctx) override { ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "CreateUser is not yet implemented for intent determination transformer")); @@ -605,7 +611,9 @@ public: return true; } - if (node.IsCallable(TKiBackup::CallableName())) { + if (node.IsCallable(TKiBackup::CallableName()) + || node.IsCallable(TKiBackupIncremental::CallableName()) + ) { return true; } @@ -1515,6 +1523,14 @@ public: .Prefix().Build(key.GetBackupCollectionPath().Prefix) .Done() .Ptr(); + } else if (mode == "backupIncremental") { + return Build<TKiBackupIncremental>(ctx, node->Pos()) + .World(node->Child(0)) + .DataSink(node->Child(1)) + .BackupCollection().Build(key.GetBackupCollectionPath().Name) + .Prefix().Build(key.GetBackupCollectionPath().Prefix) + .Done() + .Ptr(); } else { ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Unknown operation type for backup collection: " << TString(mode))); return nullptr; @@ -1790,6 +1806,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt return HandleBackup(node.Cast(), ctx); } + if (auto node = TMaybeNode<TKiBackupIncremental>(input)) { + return HandleBackupIncremental(node.Cast(), ctx); + } + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "(Kikimr DataSink) Unsupported function: " << callable.CallableName())); return TStatus::Error; diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 5384541314..b2b205be6e 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -2616,6 +2616,28 @@ public: }, "Executing BACKUP"); } + if (auto maybeBackupIncremental = TMaybeNode<TKiBackupIncremental>(input)) { + auto requireStatus = RequireChild(*input, 0); + if (requireStatus.Level != TStatus::Ok) { + return SyncStatus(requireStatus); + } + + auto backupIncremental = maybeBackupIncremental.Cast(); + + TBackupSettings settings; + settings.Name = TString(backupIncremental.BackupCollection()); + + auto cluster = TString(backupIncremental.DataSink().Cluster()); + auto future = Gateway->BackupIncremental(cluster, settings); + + return WrapFuture(future, + [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) { + Y_UNUSED(res); + auto resultNode = ctx.NewWorld(input->Pos()); + return resultNode; + }, "Executing BACKUP INCREMENTAL"); + } + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "(Kikimr DataSink) Failed to execute node: " << input->Content())); return SyncError(); diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json index 0c34512251..f966e80fd4 100644 --- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json +++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json @@ -573,6 +573,17 @@ {"Index": 2, "Name": "BackupCollection", "Type": "TCoAtom"}, {"Index": 3, "Name": "Prefix", "Type": "TCoAtom"} ] + }, + { + "Name": "TKiBackupIncremental", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "KiBackupIncremental!"}, + "Children": [ + {"Index": 0, "Name": "World", "Type": "TExprBase"}, + {"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"}, + {"Index": 2, "Name": "BackupCollection", "Type": "TCoAtom"}, + {"Index": 3, "Name": "Prefix", "Type": "TCoAtom"} + ] } ] } diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index aa61f04f94..86e304fb04 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -1073,6 +1073,8 @@ public: virtual NThreading::TFuture<TGenericResult> Backup(const TString& cluster, const TBackupSettings& settings) = 0; + virtual NThreading::TFuture<TGenericResult> BackupIncremental(const TString& cluster, const TBackupSettings& settings) = 0; + virtual NThreading::TFuture<TGenericResult> CreateUser(const TString& cluster, const TCreateUserSettings& settings) = 0; virtual NThreading::TFuture<TGenericResult> AlterUser(const TString& cluster, const TAlterUserSettings& settings) = 0; diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp index bce0f05d5d..bda7775b68 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp @@ -78,6 +78,7 @@ struct TKikimrData { DataSinkNames.insert(TKiAlterBackupCollection::CallableName()); DataSinkNames.insert(TKiDropBackupCollection::CallableName()); DataSinkNames.insert(TKiBackup::CallableName()); + DataSinkNames.insert(TKiBackupIncremental::CallableName()); CommitModes.insert(CommitModeFlush); CommitModes.insert(CommitModeRollback); @@ -128,7 +129,8 @@ struct TKikimrData { TYdbOperation::CreateBackupCollection | TYdbOperation::AlterBackupCollection | TYdbOperation::DropBackupCollection | - TYdbOperation::Backup; + TYdbOperation::Backup | + TYdbOperation::BackupIncremental; SystemColumns = { {"_yql_partition_id", NKikimr::NUdf::EDataSlot::Uint64} diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 7629f08ba3..2ee003fc03 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -216,7 +216,7 @@ private: NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState; }; -enum class TYdbOperation : ui32 { +enum class TYdbOperation : ui64 { CreateTable = 1ull << 0, DropTable = 1ull << 1, AlterTable = 1ull << 2, @@ -249,6 +249,7 @@ enum class TYdbOperation : ui32 { AlterBackupCollection = 1ull << 29, DropBackupCollection = 1ull << 30, Backup = 1ull << 31, + BackupIncremental = 1ull << 32, }; Y_DECLARE_FLAGS(TYdbOperations, TYdbOperation); diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index cf926a23a1..a24e899b57 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -80,6 +80,7 @@ private: virtual TStatus HandleAlterBackupCollection(NNodes::TKiAlterBackupCollection node, TExprContext& ctx) = 0; virtual TStatus HandleDropBackupCollection(NNodes::TKiDropBackupCollection node, TExprContext& ctx) = 0; virtual TStatus HandleBackup(NNodes::TKiBackup node, TExprContext& ctx) = 0; + virtual TStatus HandleBackupIncremental(NNodes::TKiBackupIncremental node, TExprContext& ctx) = 0; }; class TKikimrKey { diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index ab8b8859b8..6131498239 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -2185,6 +2185,11 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over return TStatus::Ok; } + TStatus HandleBackupIncremental(TKiBackupIncremental node, TExprContext&) override { + node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn()); + return TStatus::Ok; + } + private: TIntrusivePtr<IKikimrGateway> Gateway; TIntrusivePtr<TKikimrSessionContext> SessionCtx; diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 11e7e9315f..a4930a0b8c 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1655,8 +1655,11 @@ message TModifyScheme { optional TBackupCollectionDescription AlterBackupCollection = 75; optional TBackupCollectionDescription DropBackupCollection = 76; optional TBackupBackupCollection BackupBackupCollection = 78; + optional TBackupBackupCollection BackupIncrementalBackupCollection = 79; optional TMove MoveSequence = 77; + + // Some entries are grouped by semantics, so are out of order } message TCopySequence { diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 134618fa9f..8c237161a2 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -479,6 +479,7 @@ message TKqpSchemeOperation { NKikimrSchemeOp.TModifyScheme AlterBackupCollection = 46; NKikimrSchemeOp.TModifyScheme DropBackupCollection = 47; NKikimrSchemeOp.TModifyScheme Backup = 48; + NKikimrSchemeOp.TModifyScheme BackupIncremental = 49; } } diff --git a/ydb/core/protos/schemeshard/operations.proto b/ydb/core/protos/schemeshard/operations.proto index 786c4b2b54..b432f893ac 100644 --- a/ydb/core/protos/schemeshard/operations.proto +++ b/ydb/core/protos/schemeshard/operations.proto @@ -162,7 +162,10 @@ enum EOperationType { ESchemeOpAlterBackupCollection = 106; ESchemeOpDropBackupCollection = 107; ESchemeOpBackupBackupCollection = 109; + ESchemeOpBackupIncrementalBackupCollection = 110; // Move sequence ESchemeOpMoveSequence = 108; + + // Some entries are grouped by semantics, so are out of order } diff --git a/ydb/core/tx/schemeshard/schemeshard__op_traits.h b/ydb/core/tx/schemeshard/schemeshard__op_traits.h index 10b463bb69..bad9ca0401 100644 --- a/ydb/core/tx/schemeshard/schemeshard__op_traits.h +++ b/ydb/core/tx/schemeshard/schemeshard__op_traits.h @@ -299,53 +299,64 @@ struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateBackupCol constexpr inline static bool CreateDirsFromName = true; }; -template <> -struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection> : public TSchemeTxTraitsFallback { - static std::optional<THashMap<TString, THashSet<TString>>> GetRequiredPaths(const TTxTransaction& tx, const TOperationContext& context) { - THashMap<TString, THashSet<TString>> paths; - - const auto& backupOp = tx.GetBackupBackupCollection(); - - const auto& targetDir = backupOp.GetTargetDir(); - const TString& targetPath = JoinPath({tx.GetWorkingDir(), tx.GetBackupBackupCollection().GetName()}); - - const TPath& bcPath = TPath::Resolve(targetPath, context.SS); - { - auto checks = bcPath.Check(); - checks - .NotEmpty() - .NotUnderDomainUpgrade() - .IsAtLocalSchemeShard() - .IsResolved() - .NotUnderDeleting() - .NotUnderOperation() - .IsBackupCollection(); - - if (!checks) { - return {}; - } +inline TString ToX509String(const TInstant& datetime) { + return datetime.FormatLocalTime("%Y%m%d%H%M%SZ"); +} + +inline std::optional<THashMap<TString, THashSet<TString>>> GetRequiredPaths( + const TTxTransaction& tx, + const TString& targetDir, + const TString& targetName, + const TOperationContext& context) +{ + THashMap<TString, THashSet<TString>> paths; + const TString& targetPath = JoinPath({tx.GetWorkingDir(), targetName}); + + const TPath& bcPath = TPath::Resolve(targetPath, context.SS); + { + auto checks = bcPath.Check(); + checks + .NotEmpty() + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotUnderDeleting() + .NotUnderOperation() + .IsBackupCollection(); + + if (!checks) { + return {}; } + } - Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId)); - const auto& bc = context.SS->BackupCollections[bcPath->PathId]; + Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId)); + const auto& bc = context.SS->BackupCollections[bcPath->PathId]; - auto& collectionPaths = paths[targetPath]; + auto& collectionPaths = paths[targetPath]; + collectionPaths.emplace(targetDir); - for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { - std::pair<TString, TString> paths; - TString err; - if (!TrySplitPathByDb(item.GetPath(), tx.GetWorkingDir(), paths, err)) { - return {}; - } + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + std::pair<TString, TString> paths; + TString err; + if (!TrySplitPathByDb(item.GetPath(), tx.GetWorkingDir(), paths, err)) { + return {}; + } - auto pathPieces = SplitPath(paths.second); - if (pathPieces.size() > 1) { - auto parent = ExtractParent(paths.second); - collectionPaths.emplace(JoinPath({targetDir, TString(parent)})); - } + auto pathPieces = SplitPath(paths.second); + if (pathPieces.size() > 1) { + auto parent = ExtractParent(paths.second); + collectionPaths.emplace(JoinPath({targetDir, TString(parent)})); } + } + + return paths; +} - return paths; +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection> : public TSchemeTxTraitsFallback { + static std::optional<THashMap<TString, THashSet<TString>>> GetRequiredPaths(const TTxTransaction& tx, const TOperationContext& context) { + const auto& backupOp = tx.GetBackupBackupCollection(); + return ::NKikimr::NSchemeShard::GetRequiredPaths(tx, backupOp.GetTargetDir(), backupOp.GetName(), context); } static bool Rewrite(TTxTransaction& tx) { @@ -356,10 +367,23 @@ struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCol constexpr inline static bool CreateAdditionalDirs = true; constexpr inline static bool NeedRewrite = true; -private: - static inline TString ToX509String(const TInstant& datetime) { - return datetime.FormatLocalTime("%Y%m%d%H%M%SZ"); +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpBackupIncrementalBackupCollection> : public TSchemeTxTraitsFallback { + static std::optional<THashMap<TString, THashSet<TString>>> GetRequiredPaths(const TTxTransaction& tx, const TOperationContext& context) { + const auto& backupOp = tx.GetBackupIncrementalBackupCollection(); + return ::NKikimr::NSchemeShard::GetRequiredPaths(tx, backupOp.GetTargetDir(), backupOp.GetName(), context); } + + static bool Rewrite(TTxTransaction& tx) { + auto now = ToX509String(TlsActivationContext->AsActorContext().Now()); + tx.MutableBackupIncrementalBackupCollection()->SetTargetDir(now + "_incremental"); + return true; + } + + constexpr inline static bool CreateAdditionalDirs = true; + constexpr inline static bool NeedRewrite = true; }; } // namespace NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 0c23626fcb..6547101fb6 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -1514,6 +1514,8 @@ TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx case NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection: return CreateBackupBackupCollection(NextPartId(), tx, context); + case NKikimrSchemeOp::EOperationType::ESchemeOpBackupIncrementalBackupCollection: + return CreateBackupIncrementalBackupCollection(NextPartId(), tx, context); } Y_UNREACHABLE(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp index 9113d5d8ed..46fcb71f75 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp @@ -362,7 +362,7 @@ public: auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), context.SS->TabletID()); - const auto tablePath = TPath::Resolve(workingDir, context.SS).Dive(tableName); + const auto tablePath = TPath::Resolve(workingDir, context.SS).Child(tableName, TPath::TSplitChildTag{}); { const auto checks = tablePath.Check(); checks @@ -487,7 +487,7 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks( const TString& tableName, const TString& streamName) { - const auto tablePath = workingDirPath.Child(tableName); + const auto tablePath = workingDirPath.Child(tableName, TPath::TSplitChildTag{}); { const auto checks = tablePath.Check(); checks diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp index f47ccec868..7162c6fb93 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_continuous_backup.cpp @@ -66,7 +66,7 @@ void DoCreateIncrBackupTable(const TOperationId& opId, const TPath& dst, NKikimr result.push_back(CreateNewTable(NextPartId(opId, result), outTx)); } -TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) { +bool CreateAlterContinuousBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context, TVector<ISubOperation::TPtr>& result) { Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpAlterContinuousBackup); const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS); @@ -75,7 +75,8 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons const auto checksResult = NCdc::DoAlterStreamPathChecks(opId, workingDirPath, tableName, NBackup::CB_CDC_STREAM_NAME); if (std::holds_alternative<ISubOperation::TPtr>(checksResult)) { - return {std::get<ISubOperation::TPtr>(checksResult)}; + result = {std::get<ISubOperation::TPtr>(checksResult)}; + return false; } const auto [tablePath, streamPath] = std::get<NCdc::TStreamPaths>(checksResult); @@ -84,7 +85,7 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons const auto topicPath = streamPath.Child("streamImpl"); TTopicInfo::TPtr topic = context.SS->Topics.at(topicPath.Base()->PathId); - const auto backupTablePath = workingDirPath.Child(cbOp.GetTakeIncrementalBackup().GetDstPath()); + const auto backupTablePath = workingDirPath.Child(cbOp.GetTakeIncrementalBackup().GetDstPath(), TPath::TSplitChildTag{}); const NScheme::TTypeRegistry* typeRegistry = AppData(context.Ctx)->TypeRegistry; @@ -94,11 +95,13 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons TString errStr; if (!context.SS->CheckApplyIf(tx, errStr)) { - return {CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, errStr)}; + result = {CreateReject(opId, NKikimrScheme::StatusPreconditionFailed, errStr)}; + return false; } if (!context.SS->CheckLocks(tablePath.Base()->PathId, tx, errStr)) { - return {CreateReject(opId, NKikimrScheme::StatusMultipleModifications, errStr)}; + result = {CreateReject(opId, NKikimrScheme::StatusMultipleModifications, errStr)}; + return false; } NKikimrSchemeOp::TAlterCdcStream alterCdcStreamOp; @@ -111,11 +114,11 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons alterCdcStreamOp.MutableDisable(); break; default: - return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, TStringBuilder() + result = {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, TStringBuilder() << "Unknown action: " << static_cast<ui32>(cbOp.GetActionCase()))}; - } - TVector<ISubOperation::TPtr> result; + return false; + } NCdc::DoAlterStream(result, alterCdcStreamOp, opId, workingDirPath, tablePath); @@ -124,6 +127,15 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons DoAlterPqPart(opId, backupTablePath, topicPath, topic, result); } + return true; +} + +TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) { + TVector<ISubOperation::TPtr> result; + + CreateAlterContinuousBackup(opId, tx, context, result); + + return result; return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp new file mode 100644 index 0000000000..22ba02d8d6 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_incremental_backup_collection.cpp @@ -0,0 +1,66 @@ +#include "schemeshard__operation_common.h" +#include "schemeshard__operation_create_cdc_stream.h" +#include "schemeshard_impl.h" + +#include <ydb/core/tx/schemeshard/backup/constants.h> + +namespace NKikimr::NSchemeShard { + +TVector<ISubOperation::TPtr> CreateBackupIncrementalBackupCollection(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) { + TVector<ISubOperation::TPtr> result; + + TString bcPathStr = JoinPath({tx.GetWorkingDir(), tx.GetBackupIncrementalBackupCollection().GetName()}); + + const TPath& bcPath = TPath::Resolve(bcPathStr, context.SS); + { + auto checks = bcPath.Check(); + checks + .NotEmpty() + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotUnderDeleting() + .NotUnderOperation() + .IsBackupCollection(); + + if (!checks) { + result = {CreateReject(opId, checks.GetStatus(), checks.GetError())}; + return result; + } + } + + Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId)); + const auto& bc = context.SS->BackupCollections[bcPath->PathId]; + bool incrBackupEnabled = bc->Description.HasIncrementalBackupConfig(); + + if (!incrBackupEnabled) { + return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, "Incremental backup is disabled on this collection")}; + } + + for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) { + std::pair<TString, TString> paths; + TString err; + if (!TrySplitPathByDb(item.GetPath(), bcPath.GetDomainPathString(), paths, err)) { + result = {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, err)}; + return {}; + } + auto& relativeItemPath = paths.second; + + NKikimrSchemeOp::TModifyScheme modifyScheme; + modifyScheme.SetWorkingDir(tx.GetWorkingDir()); + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterContinuousBackup); + modifyScheme.SetInternal(true); + auto& cb = *modifyScheme.MutableAlterContinuousBackup(); + cb.SetTableName(relativeItemPath); + auto& ib = *cb.MutableTakeIncrementalBackup(); + ib.SetDstPath(JoinPath({tx.GetBackupIncrementalBackupCollection().GetName(), tx.GetBackupIncrementalBackupCollection().GetTargetDir(), relativeItemPath})); + + if (!CreateAlterContinuousBackup(opId, modifyScheme, context, result)) { + return result; + } + } + + return result; +} + +} // namespace NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 25cb768147..7ed0d7fa8d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -439,6 +439,7 @@ ISubOperation::TPtr CreateDropCdcStreamAtTable(TOperationId id, TTxState::ETxSta // Create TVector<ISubOperation::TPtr> CreateNewContinuousBackup(TOperationId id, const TTxTransaction& tx, TOperationContext& context); TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId id, const TTxTransaction& tx, TOperationContext& context); +bool CreateAlterContinuousBackup(TOperationId id, const TTxTransaction& tx, TOperationContext& context, TVector<ISubOperation::TPtr>& result); TVector<ISubOperation::TPtr> CreateDropContinuousBackup(TOperationId id, const TTxTransaction& tx, TOperationContext& context); ISubOperation::TPtr CreateBackup(TOperationId id, const TTxTransaction& tx); @@ -661,6 +662,7 @@ ISubOperation::TPtr CreateDropBackupCollection(TOperationId id, const TTxTransac ISubOperation::TPtr CreateDropBackupCollection(TOperationId id, TTxState::ETxState state); TVector<ISubOperation::TPtr> CreateBackupBackupCollection(TOperationId opId, const TTxTransaction& tx, TOperationContext& context); +TVector<ISubOperation::TPtr> CreateBackupIncrementalBackupCollection(TOperationId opId, const TTxTransaction& tx, TOperationContext& context); } } diff --git a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp index 91ccb3430c..aa2ac2448f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp @@ -259,6 +259,8 @@ TString DefineUserOperationName(const NKikimrSchemeOp::TModifyScheme& tx) { case NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection: return "BACKUP"; + case NKikimrSchemeOp::EOperationType::ESchemeOpBackupIncrementalBackupCollection: + return "BACKUP INCREMENTAL"; } Y_ABORT("switch should cover all operation types"); } @@ -584,6 +586,9 @@ TVector<TString> ExtractChangingPaths(const NKikimrSchemeOp::TModifyScheme& tx) case NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection: result.emplace_back(NKikimr::JoinPath({tx.GetWorkingDir(), tx.GetBackupBackupCollection().GetName()})); break; + case NKikimrSchemeOp::EOperationType::ESchemeOpBackupIncrementalBackupCollection: + result.emplace_back(NKikimr::JoinPath({tx.GetWorkingDir(), tx.GetBackupIncrementalBackupCollection().GetName()})); + break; } return result; diff --git a/ydb/core/tx/schemeshard/schemeshard_path.cpp b/ydb/core/tx/schemeshard/schemeshard_path.cpp index 1a7d7dc53f..16a13b5cdb 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path.cpp @@ -1307,6 +1307,17 @@ TPath TPath::Child(const TString& name) const { return result; } +TPath TPath::Child(const TString& name, TSplitChildTag) const { + TPath result = *this; + + auto pathParts = SplitPath(name); + for (const auto& part : pathParts) { + result.Dive(part); + } + + return result; +} + TPath TPath::Resolve(const TString path, TSchemeShard* ss) { Y_ABORT_UNLESS(ss); diff --git a/ydb/core/tx/schemeshard/schemeshard_path.h b/ydb/core/tx/schemeshard/schemeshard_path.h index eacfa32526..3ffadedcb2 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.h +++ b/ydb/core/tx/schemeshard/schemeshard_path.h @@ -106,6 +106,7 @@ public: }; public: + struct TSplitChildTag {}; explicit TPath(TSchemeShard* ss); TPath(TVector<TPathElement::TPtr>&& elements, TSchemeShard* ss); @@ -143,6 +144,7 @@ public: bool IsDomain() const; TPath& Dive(const TString& name); TPath Child(const TString& name) const; + TPath Child(const TString& name, TSplitChildTag) const; TPathElement::TPtr Base() const; TPathElement* operator->() const; bool IsDeleted() const; diff --git a/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp b/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp index bb772d5236..2b8d1c79ec 100644 --- a/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp +++ b/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp @@ -342,4 +342,46 @@ Y_UNIT_TEST_SUITE(TBackupCollectionTests) { {NKikimrScheme::EStatus::StatusPathDoesNotExist}); env.TestWaitNotification(runtime, txId); } + + Y_UNIT_TEST(BackupNonIncrementalCollection) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true)); + ui64 txId = 100; + + SetupLogging(runtime); + + PrepareDirs(runtime, env, txId); + + TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections/", DefaultCollectionSettings()); + + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/.backups/collections/" DEFAULT_NAME_1), { + NLs::PathExist, + NLs::IsBackupCollection, + }); + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table1" + Columns { Name: "key" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TestBackupBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")"); + env.TestWaitNotification(runtime, txId); + + TestBackupIncrementalBackupCollection(runtime, ++txId, "/MyRoot", + R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")", + {NKikimrScheme::EStatus::StatusInvalidParameter}); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/.backups/collections/" DEFAULT_NAME_1), { + NLs::PathExist, + NLs::IsBackupCollection, + NLs::ChildrenCount(1), + NLs::Finished, + }); + } } // TBackupCollectionTests diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp index 170d5f3c29..96b89f4c29 100644 --- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp @@ -11509,5 +11509,66 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { // TODO: validate no index created // TODO: validate no stream created + + if (WithIncremental) { + TestBackupIncrementalBackupCollection(runtime, ++txId, "/MyRoot", R"( + Name: ".backups/collections/MyCollection1" + )"); + + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/.backups/collections/MyCollection1"), { + NLs::PathExist, + NLs::IsBackupCollection, + NLs::ChildrenCount(2), + NLs::Finished, + }); + + auto descr = DescribePath(runtime, "/MyRoot/.backups/collections/MyCollection1").GetPathDescription(); + UNIT_ASSERT_VALUES_EQUAL(descr.GetChildren().size(), 2); + + const char* incrBackupDirName = nullptr; + for (auto& dir : descr.GetChildren()) { + if (dir.GetName().EndsWith("_incremental")) { + incrBackupDirName = dir.GetName().c_str(); + } + } + + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s", incrBackupDirName)), { + NLs::PathExist, + NLs::ChildrenCount(2), + NLs::Finished, + }); + + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/Table1", incrBackupDirName)), { + NLs::PathExist, + NLs::IsTable, + NLs::Finished, + }); + + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/DirA", incrBackupDirName)), { + NLs::PathExist, + NLs::ChildrenCount(2), + NLs::Finished, + }); + + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/DirA/Table2", incrBackupDirName)), { + NLs::PathExist, + NLs::IsTable, + NLs::Finished, + }); + + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/DirA/DirB", incrBackupDirName)), { + NLs::PathExist, + NLs::ChildrenCount(1), + NLs::Finished, + }); + + TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/DirA/DirB/Table3", incrBackupDirName)), { + NLs::PathExist, + NLs::IsTable, + NLs::Finished, + }); + } } } diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index 0c7a78ab81..6676c0152b 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -926,6 +926,7 @@ namespace NSchemeShardUT_Private { GENERIC_HELPERS(DropBackupCollection, NKikimrSchemeOp::EOperationType::ESchemeOpDropBackupCollection, &NKikimrSchemeOp::TModifyScheme::MutableDropBackupCollection) DROP_BY_PATH_ID_HELPERS(DropBackupCollection, NKikimrSchemeOp::EOperationType::ESchemeOpDropBackupCollection) GENERIC_HELPERS(BackupBackupCollection, NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection, &NKikimrSchemeOp::TModifyScheme::MutableBackupBackupCollection) + GENERIC_HELPERS(BackupIncrementalBackupCollection, NKikimrSchemeOp::EOperationType::ESchemeOpBackupIncrementalBackupCollection, &NKikimrSchemeOp::TModifyScheme::MutableBackupIncrementalBackupCollection) #undef DROP_BY_PATH_ID_HELPERS #undef GENERIC_WITH_ATTRS_HELPERS diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index f462cf9872..d64de269f4 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -300,6 +300,7 @@ namespace NSchemeShardUT_Private { GENERIC_HELPERS(DropBackupCollection); DROP_BY_PATH_ID_HELPERS(DropBackupCollection); GENERIC_HELPERS(BackupBackupCollection); + GENERIC_HELPERS(BackupIncrementalBackupCollection); #undef DROP_BY_PATH_ID_HELPERS #undef GENERIC_WITH_ATTRS_HELPERS diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 7b44b323a3..b1f82c8019 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -101,6 +101,7 @@ SRCS( schemeshard__operation_apply_build_index.cpp schemeshard__operation_assign_bsv.cpp schemeshard__operation_backup_backup_collection.cpp + schemeshard__operation_backup_incremental_backup_collection.cpp schemeshard__operation_blob_depot.cpp schemeshard__operation_cancel_tx.cpp schemeshard__operation_cansel_build_index.cpp diff --git a/ydb/core/tx/tx_proxy/schemereq.cpp b/ydb/core/tx/tx_proxy/schemereq.cpp index e8fdd3f992..a910a94739 100644 --- a/ydb/core/tx/tx_proxy/schemereq.cpp +++ b/ydb/core/tx/tx_proxy/schemereq.cpp @@ -379,6 +379,9 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { case NKikimrSchemeOp::ESchemeOpBackupBackupCollection: return *modifyScheme.MutableBackupBackupCollection()->MutableName(); + + case NKikimrSchemeOp::ESchemeOpBackupIncrementalBackupCollection: + return *modifyScheme.MutableBackupIncrementalBackupCollection()->MutableName(); } } @@ -763,6 +766,15 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> { ResolveForACL.push_back(toResolve); break; } + case NKikimrSchemeOp::ESchemeOpBackupIncrementalBackupCollection: { + auto toResolve = TPathToResolve(pbModifyScheme.GetOperationType()); + toResolve.Path = workingDir; + auto collectionPath = SplitPath(pbModifyScheme.GetBackupIncrementalBackupCollection().GetName()); + std::move(collectionPath.begin(), collectionPath.end(), std::back_inserter(toResolve.Path)); + toResolve.RequiredAccess = NACLib::EAccessRights::GenericWrite; + ResolveForACL.push_back(toResolve); + break; + } case NKikimrSchemeOp::ESchemeOpMoveTable: { auto& descr = pbModifyScheme.GetMoveTable(); { |