aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorInnokentii Mokin <innokentii@ydb.tech>2024-11-20 14:48:54 +0300
committerGitHub <noreply@github.com>2024-11-20 14:48:54 +0300
commitbd7040fb3886ee0a2f61db154b5a79c54f16ef79 (patch)
treef530192720d16a06d91423d684359e7e40841e9e
parent0cc149bcbf3defc46d8df66e99c500ad2f636880 (diff)
downloadydb-bd7040fb3886ee0a2f61db154b5a79c54f16ef79.tar.gz
Add backup backup collection op (#11293)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp6
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp4
-rw-r--r--ydb/core/kqp/host/kqp_gateway_proxy.cpp44
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasink.cpp22
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp22
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_expr_nodes.json11
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h6
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.cpp6
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h63
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider_impl.h1
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_type_ann.cpp5
-rw-r--r--ydb/core/protos/flat_scheme_op.proto10
-rw-r--r--ydb/core/protos/kqp_physical.proto1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__op_traits.h78
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation.cpp73
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp110
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp39
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp20
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_part.h6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp7
-rw-r--r--ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp54
-rw-r--r--ydb/core/tx/schemeshard/ut_base/ut_base.cpp134
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.cpp1
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.h1
-rw-r--r--ydb/core/tx/schemeshard/ya.make9
-rw-r--r--ydb/core/tx/tx_proxy/schemereq.cpp12
28 files changed, 682 insertions, 81 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
index 3c68f35881..a8781780f2 100644
--- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
@@ -372,6 +372,12 @@ public:
break;
}
+ case NKqpProto::TKqpSchemeOperation::kBackup: {
+ const auto& modifyScheme = schemeOp.GetBackup();
+ ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
+ break;
+ }
+
default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
<< (ui32) schemeOp.GetOperationCase());
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 4dc088bfd3..f70e922900 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -1282,6 +1282,10 @@ public:
return NotImplemented<TGenericResult>();
}
+ TFuture<TGenericResult> Backup(const TString&, const NYql::TBackupSettings&) override {
+ return NotImplemented<TGenericResult>();
+ }
+
TFuture<TGenericResult> CreateUser(const TString& cluster, const NYql::TCreateUserSettings& settings) override {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
index 9688d0aecd..1e68c0dc47 100644
--- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp
+++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp
@@ -1287,6 +1287,50 @@ public:
}
}
+
+ TFuture<TGenericResult> Backup(const TString& cluster, const NYql::TBackupSettings& settings) override {
+ CHECK_PREPARED_DDL(Backup);
+
+ try {
+ if (cluster != SessionCtx->GetCluster()) {
+ return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
+ }
+
+ std::pair<TString, TString> pathPair;
+ if (settings.Name.StartsWith("/")) {
+ TString error;
+ if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, true)) {
+ return MakeFuture(ResultFromError<TGenericResult>(error));
+ }
+ } else {
+ pathPair.second = ".backups/collections/" + settings.Name;
+ }
+
+ NKikimrSchemeOp::TModifyScheme tx;
+ tx.SetWorkingDir(GetDatabase());
+ tx.SetOperationType(NKikimrSchemeOp::ESchemeOpBackupBackupCollection);
+
+ auto& op = *tx.MutableBackupBackupCollection();
+ op.SetName(pathPair.second);
+
+ if (IsPrepare()) {
+ auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
+ auto& phyTx = *phyQuery.AddTransactions();
+ phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
+ phyTx.MutableSchemeOperation()->MutableBackup()->Swap(&tx);
+
+ TGenericResult result;
+ result.SetSuccess();
+ return MakeFuture(result);
+ } else {
+ return Gateway->ModifyScheme(std::move(tx));
+ }
+ }
+ catch (yexception& e) {
+ return MakeFuture(ResultFromException<TGenericResult>(e));
+ }
+ }
+
TFuture<TGenericResult> CreateUser(const TString& cluster, const TCreateUserSettings& settings) override {
CHECK_PREPARED_DDL(CreateUser);
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
index bd7de4fda8..afa1d0cf86 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
@@ -167,6 +167,12 @@ private:
return TStatus::Ok;
}
+ TStatus HandleBackup(TKiBackup node, TExprContext& ctx) override {
+ Y_UNUSED(ctx);
+ Y_UNUSED(node);
+ return TStatus::Ok;
+ }
+
TStatus HandleCreateUser(TKiCreateUser node, TExprContext& ctx) override {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
<< "CreateUser is not yet implemented for intent determination transformer"));
@@ -599,6 +605,10 @@ public:
return true;
}
+ if (node.IsCallable(TKiBackup::CallableName())) {
+ return true;
+ }
+
return false;
}
@@ -1497,6 +1507,14 @@ public:
.Build()
.Done()
.Ptr();
+ } else if (mode == "backup") {
+ return Build<TKiBackup>(ctx, node->Pos())
+ .World(node->Child(0))
+ .DataSink(node->Child(1))
+ .BackupCollection().Build(key.GetBackupCollectionPath().Name)
+ .Prefix().Build(key.GetBackupCollectionPath().Prefix)
+ .Done()
+ .Ptr();
} else {
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Unknown operation type for backup collection: " << TString(mode)));
return nullptr;
@@ -1768,6 +1786,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt
return HandleDropBackupCollection(node.Cast(), ctx);
}
+ if (auto node = TMaybeNode<TKiBackup>(input)) {
+ return HandleBackup(node.Cast(), ctx);
+ }
+
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "(Kikimr DataSink) Unsupported function: "
<< callable.CallableName()));
return TStatus::Error;
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 189bba353c..32797710cd 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -2592,6 +2592,28 @@ public:
}, "Executing DROP BACKUP COLLECTION");
}
+ if (auto maybeBackup = TMaybeNode<TKiBackup>(input)) {
+ auto requireStatus = RequireChild(*input, 0);
+ if (requireStatus.Level != TStatus::Ok) {
+ return SyncStatus(requireStatus);
+ }
+
+ auto backup = maybeBackup.Cast();
+
+ TBackupSettings settings;
+ settings.Name = TString(backup.BackupCollection());
+
+ auto cluster = TString(backup.DataSink().Cluster());
+ auto future = Gateway->Backup(cluster, settings);
+
+ return WrapFuture(future,
+ [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
+ Y_UNUSED(res);
+ auto resultNode = ctx.NewWorld(input->Pos());
+ return resultNode;
+ }, "Executing BACKUP");
+ }
+
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder()
<< "(Kikimr DataSink) Failed to execute node: " << input->Content()));
return SyncError();
diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
index e92b574926..0c34512251 100644
--- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
+++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
@@ -562,6 +562,17 @@
{"Index": 3, "Name": "Prefix", "Type": "TCoAtom"},
{"Index": 4, "Name": "Cascade", "Type": "TCoAtom"}
]
+ },
+ {
+ "Name": "TKiBackup",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "KiBackup!"},
+ "Children": [
+ {"Index": 0, "Name": "World", "Type": "TExprBase"},
+ {"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"},
+ {"Index": 2, "Name": "BackupCollection", "Type": "TCoAtom"},
+ {"Index": 3, "Name": "Prefix", "Type": "TCoAtom"}
+ ]
}
]
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index 9a04060f00..de29871b07 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -876,6 +876,10 @@ struct TDropBackupCollectionSettings {
bool Cascade = false;
};
+struct TBackupSettings {
+ TString Name;
+};
+
struct TKikimrListPathItem {
TKikimrListPathItem(TString name, bool isDirectory) {
Name = name;
@@ -1066,6 +1070,8 @@ public:
virtual NThreading::TFuture<TGenericResult> DropBackupCollection(const TString& cluster, const TDropBackupCollectionSettings& settings) = 0;
+ virtual NThreading::TFuture<TGenericResult> Backup(const TString& cluster, const TBackupSettings& settings) = 0;
+
virtual NThreading::TFuture<TGenericResult> CreateUser(const TString& cluster, const TCreateUserSettings& settings) = 0;
virtual NThreading::TFuture<TGenericResult> AlterUser(const TString& cluster, const TAlterUserSettings& settings) = 0;
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
index 24695052ff..bce0f05d5d 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
@@ -77,6 +77,7 @@ struct TKikimrData {
DataSinkNames.insert(TKiCreateBackupCollection::CallableName());
DataSinkNames.insert(TKiAlterBackupCollection::CallableName());
DataSinkNames.insert(TKiDropBackupCollection::CallableName());
+ DataSinkNames.insert(TKiBackup::CallableName());
CommitModes.insert(CommitModeFlush);
CommitModes.insert(CommitModeRollback);
@@ -126,7 +127,8 @@ struct TKikimrData {
TYdbOperation::ModifyPermission |
TYdbOperation::CreateBackupCollection |
TYdbOperation::AlterBackupCollection |
- TYdbOperation::DropBackupCollection;
+ TYdbOperation::DropBackupCollection |
+ TYdbOperation::Backup;
SystemColumns = {
{"_yql_partition_id", NKikimr::NUdf::EDataSlot::Uint64}
@@ -438,7 +440,7 @@ bool TKikimrKey::Extract(const TExprNode& key) {
KeyType = Type::PGObject;
Target = key.Child(0)->Child(1)->Child(0)->Content();
ObjectType = key.Child(0)->Child(2)->Child(0)->Content();
- } else if (tagName == "backupCollection") {
+ } else if (tagName == "backupCollection" || tagName == "backup") {
KeyType = Type::BackupCollection;
Target = key.Child(0)->Child(1)->Child(0)->Content();
ExplicitPrefix = key.Child(0)->Child(2)->Child(0)->Content();
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index c4b734d93c..7629f08ba3 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -217,37 +217,38 @@ private:
};
enum class TYdbOperation : ui32 {
- CreateTable = 1 << 0,
- DropTable = 1 << 1,
- AlterTable = 1 << 2,
- Select = 1 << 3,
- Upsert = 1 << 4,
- Replace = 1 << 5,
- Update = 1 << 6,
- Delete = 1 << 7,
- InsertRevert = 1 << 8,
- InsertAbort = 1 << 9,
- ReservedInsertIgnore = 1 << 10,
- UpdateOn = 1 << 11,
- DeleteOn = 1 << 12,
- CreateUser = 1 << 13,
- AlterUser = 1 << 14,
- DropUser = 1 << 15,
- CreateGroup = 1 << 16,
- AlterGroup = 1 << 17,
- DropGroup = 1 << 18,
- CreateTopic = 1 << 19,
- AlterTopic = 1 << 20,
- DropTopic = 1 << 21,
- ModifyPermission = 1 << 22,
- RenameGroup = 1 << 23,
- CreateReplication = 1 << 24,
- AlterReplication = 1 << 25,
- DropReplication = 1 << 26,
- Analyze = 1 << 27,
- CreateBackupCollection = 1 << 28,
- AlterBackupCollection = 1 << 29,
- DropBackupCollection = 1 << 30,
+ CreateTable = 1ull << 0,
+ DropTable = 1ull << 1,
+ AlterTable = 1ull << 2,
+ Select = 1ull << 3,
+ Upsert = 1ull << 4,
+ Replace = 1ull << 5,
+ Update = 1ull << 6,
+ Delete = 1ull << 7,
+ InsertRevert = 1ull << 8,
+ InsertAbort = 1ull << 9,
+ ReservedInsertIgnore = 1ull << 10,
+ UpdateOn = 1ull << 11,
+ DeleteOn = 1ull << 12,
+ CreateUser = 1ull << 13,
+ AlterUser = 1ull << 14,
+ DropUser = 1ull << 15,
+ CreateGroup = 1ull << 16,
+ AlterGroup = 1ull << 17,
+ DropGroup = 1ull << 18,
+ CreateTopic = 1ull << 19,
+ AlterTopic = 1ull << 20,
+ DropTopic = 1ull << 21,
+ ModifyPermission = 1ull << 22,
+ RenameGroup = 1ull << 23,
+ CreateReplication = 1ull << 24,
+ AlterReplication = 1ull << 25,
+ DropReplication = 1ull << 26,
+ Analyze = 1ull << 27,
+ CreateBackupCollection = 1ull << 28,
+ AlterBackupCollection = 1ull << 29,
+ DropBackupCollection = 1ull << 30,
+ Backup = 1ull << 31,
};
Y_DECLARE_FLAGS(TYdbOperations, TYdbOperation);
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
index 099a90eb3f..cf926a23a1 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
@@ -79,6 +79,7 @@ private:
virtual TStatus HandleCreateBackupCollection(NNodes::TKiCreateBackupCollection node, TExprContext& ctx) = 0;
virtual TStatus HandleAlterBackupCollection(NNodes::TKiAlterBackupCollection node, TExprContext& ctx) = 0;
virtual TStatus HandleDropBackupCollection(NNodes::TKiDropBackupCollection node, TExprContext& ctx) = 0;
+ virtual TStatus HandleBackup(NNodes::TKiBackup node, TExprContext& ctx) = 0;
};
class TKikimrKey {
diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
index da26a6afa2..ab8b8859b8 100644
--- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
@@ -2180,6 +2180,11 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
return TStatus::Ok;
}
+ TStatus HandleBackup(TKiBackup node, TExprContext&) override {
+ node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn());
+ return TStatus::Ok;
+ }
+
private:
TIntrusivePtr<IKikimrGateway> Gateway;
TIntrusivePtr<TKikimrSessionContext> SessionCtx;
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index e04b7bbff6..1ddfa0ccdb 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -318,6 +318,7 @@ message TTableDescription {
// on table creation to allow system column names (started with __ydb_)
// It won't be present on describes and won't be preserved
optional bool SystemColumnNamesAllowed = 42;
+ optional bool AllowUnderSameOperation = 44 [default = false];
}
message TDictionaryEncodingSettings {
@@ -1070,6 +1071,7 @@ message TCopyTableConfig { //TTableDescription implemets copying a table in orig
// additionally creates cdc stream on src table consistently with taking snapshot
optional TCreateCdcStream CreateSrcCdcStream = 6;
+ optional bool AllowUnderSameOperation = 7 [default = false];
}
message TConsistentTableCopyingConfig {
@@ -1617,6 +1619,7 @@ enum EOperationType {
ESchemeOpCreateBackupCollection = 105;
ESchemeOpAlterBackupCollection = 106;
ESchemeOpDropBackupCollection = 107;
+ ESchemeOpBackupBackupCollection = 109;
// Move sequence
ESchemeOpMoveSequence = 108;
@@ -1807,6 +1810,7 @@ message TModifyScheme {
optional TBackupCollectionDescription CreateBackupCollection = 74;
optional TBackupCollectionDescription AlterBackupCollection = 75;
optional TBackupCollectionDescription DropBackupCollection = 76;
+ optional TBackupBackupCollection BackupBackupCollection = 78;
optional TMove MoveSequence = 77;
}
@@ -2199,3 +2203,9 @@ message TBackupCollectionDescription {
google.protobuf.Empty Cluster = 7;
}
}
+
+message TBackupBackupCollection {
+ optional string Name = 1;
+ optional NKikimrProto.TPathID PathId = 2;
+ optional string TargetDir = 3; // must be set on Rewrite
+}
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index dede8af2c6..134618fa9f 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -478,6 +478,7 @@ message TKqpSchemeOperation {
NKikimrSchemeOp.TModifyScheme CreateBackupCollection = 45;
NKikimrSchemeOp.TModifyScheme AlterBackupCollection = 46;
NKikimrSchemeOp.TModifyScheme DropBackupCollection = 47;
+ NKikimrSchemeOp.TModifyScheme Backup = 48;
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__op_traits.h b/ydb/core/tx/schemeshard/schemeshard__op_traits.h
index 49f6777d09..10b463bb69 100644
--- a/ydb/core/tx/schemeshard/schemeshard__op_traits.h
+++ b/ydb/core/tx/schemeshard/schemeshard__op_traits.h
@@ -1,5 +1,9 @@
#pragma once
+#include "schemeshard_path.h"
+#include "schemeshard_impl.h"
+#include "schemeshard__operation.h"
+
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <util/generic/string.h>
@@ -22,14 +26,19 @@ struct TSchemeTxTraitsFallback {
return false;
}
- static THashMap<TString, THashSet<TString>> GetRequiredPaths(const TTxTransaction& tx) {
- Y_UNUSED(tx);
+ static std::optional<THashMap<TString, THashSet<TString>>> GetRequiredPaths(const TTxTransaction& tx, const TOperationContext& context) {
+ Y_UNUSED(tx, context);
return {};
}
- constexpr inline static bool CreateDirsFromName = false;
+ static bool Rewrite(TTxTransaction& tx) {
+ Y_UNUSED(tx);
+ return false;
+ }
+ constexpr inline static bool CreateDirsFromName = false;
constexpr inline static bool CreateAdditionalDirs = false;
+ constexpr inline static bool NeedRewrite = false;
};
template <NKikimrSchemeOp::EOperationType opType>
@@ -290,4 +299,67 @@ struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateBackupCol
constexpr inline static bool CreateDirsFromName = true;
};
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection> : public TSchemeTxTraitsFallback {
+ static std::optional<THashMap<TString, THashSet<TString>>> GetRequiredPaths(const TTxTransaction& tx, const TOperationContext& context) {
+ THashMap<TString, THashSet<TString>> paths;
+
+ const auto& backupOp = tx.GetBackupBackupCollection();
+
+ const auto& targetDir = backupOp.GetTargetDir();
+ const TString& targetPath = JoinPath({tx.GetWorkingDir(), tx.GetBackupBackupCollection().GetName()});
+
+ const TPath& bcPath = TPath::Resolve(targetPath, context.SS);
+ {
+ auto checks = bcPath.Check();
+ checks
+ .NotEmpty()
+ .NotUnderDomainUpgrade()
+ .IsAtLocalSchemeShard()
+ .IsResolved()
+ .NotUnderDeleting()
+ .NotUnderOperation()
+ .IsBackupCollection();
+
+ if (!checks) {
+ return {};
+ }
+ }
+
+ Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId));
+ const auto& bc = context.SS->BackupCollections[bcPath->PathId];
+
+ auto& collectionPaths = paths[targetPath];
+
+ for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) {
+ std::pair<TString, TString> paths;
+ TString err;
+ if (!TrySplitPathByDb(item.GetPath(), tx.GetWorkingDir(), paths, err)) {
+ return {};
+ }
+
+ auto pathPieces = SplitPath(paths.second);
+ if (pathPieces.size() > 1) {
+ auto parent = ExtractParent(paths.second);
+ collectionPaths.emplace(JoinPath({targetDir, TString(parent)}));
+ }
+ }
+
+ return paths;
+ }
+
+ static bool Rewrite(TTxTransaction& tx) {
+ auto now = ToX509String(TlsActivationContext->AsActorContext().Now());
+ tx.MutableBackupBackupCollection()->SetTargetDir(now + "_full");
+ return true;
+ }
+
+ constexpr inline static bool CreateAdditionalDirs = true;
+ constexpr inline static bool NeedRewrite = true;
+private:
+ static inline TString ToX509String(const TInstant& datetime) {
+ return datetime.FormatLocalTime("%Y%m%d%H%M%SZ");
+ }
+};
+
} // namespace NKikimr::NSchemeShard
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
index 2cd9a53027..2d7c65dff3 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
@@ -176,6 +176,7 @@ bool TSchemeShard::ProcessOperationParts(
}
THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request, TOperationContext& context) {
+ using namespace NGenerated;
THolder<TProposeResponse> response = nullptr;
auto selfId = SelfTabletId();
@@ -211,13 +212,33 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
}
}
+ //
+
+ TVector<TTxTransaction> rewrittenTransactions;
+
+ // # Phase Zero
+ // Rewrites transactions.
+ // It may fill or clear particular fields based on some runtime SS state.
+
+ for (auto tx : record.GetTransaction()) {
+ if (DispatchOp(tx, [&](auto traits) { return traits.NeedRewrite && !traits.Rewrite(tx); })) {
+ response.Reset(new TProposeResponse(NKikimrScheme::StatusPreconditionFailed, ui64(txId), ui64(selfId)));
+ response->SetError(NKikimrScheme::StatusPreconditionFailed, "Invalid schema rewrite rule.");
+ return std::move(response);
+ }
+
+ rewrittenTransactions.push_back(std::move(tx));
+ }
+
+ //
+
TVector<TTxTransaction> transactions;
TVector<TTxTransaction> generatedTransactions;
// # Phase One
// Generate MkDir transactions based on object name.
- for (const auto& transaction : record.GetTransaction()) {
+ for (const auto& transaction : rewrittenTransactions) {
auto splitResult = operation->SplitIntoTransactions(transaction, context);
if (splitResult.Status != NKikimrScheme::StatusSuccess) {
response.Reset(new TProposeResponse(splitResult.Status, ui64(txId), ui64(selfId)));
@@ -261,6 +282,8 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
}
}
+ //
+
return std::move(response);
}
@@ -730,7 +753,7 @@ NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxOperationReply(TOperati
TString JoinPath(const TString& workingDir, const TString& name) {
- Y_ABORT_UNLESS(!name.StartsWith('/') && !name.EndsWith('/'));
+ Y_ABORT_UNLESS(!name.StartsWith('/') && !name.EndsWith('/'), "%s", name.c_str());
return TStringBuilder()
<< workingDir
<< (workingDir.EndsWith('/') ? "" : "/")
@@ -957,29 +980,31 @@ TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTx
// # Generates MkDirs based on transaction-specific requirements
if (DispatchOp(tx, [&](auto traits) { return traits.CreateAdditionalDirs; })) {
- for (const auto& [parentPathStr, pathStrs] : DispatchOp(tx, [&](auto traits) { return traits.GetRequiredPaths(tx); })) {
- const TPath parentPath = TPath::Resolve(parentPathStr, context.SS);
- {
- TPath::TChecker checks = parentPath.Check();
- checks
- .NotUnderDomainUpgrade()
- .IsAtLocalSchemeShard()
- .IsResolved()
- .NotDeleted()
- .NotUnderDeleting()
- .IsCommonSensePath()
- .IsLikeDirectory();
-
- if (!checks) {
- result.Transaction = tx;
- return result;
+ if (auto requiredPaths = DispatchOp(tx, [&](auto traits) { return traits.GetRequiredPaths(tx, context); }); requiredPaths) {
+ for (const auto& [parentPathStr, pathStrs] : *requiredPaths) {
+ const TPath parentPath = TPath::Resolve(parentPathStr, context.SS);
+ {
+ TPath::TChecker checks = parentPath.Check();
+ checks
+ .NotUnderDomainUpgrade()
+ .IsAtLocalSchemeShard()
+ .IsResolved()
+ .NotDeleted()
+ .NotUnderDeleting()
+ .IsCommonSensePath()
+ .IsLikeDirectory();
+
+ if (!checks) {
+ result.Transaction = tx;
+ return result;
+ }
}
- }
- for (const auto& pathStr : pathStrs) {
- TPath path = TPath::Resolve(JoinPath(parentPathStr, pathStr), context.SS);
- if (!CreateDirs(tx, parentPath, path, createdPaths, result)) {
- return result;
+ for (const auto& pathStr : pathStrs) {
+ TPath path = TPath::Resolve(JoinPath(parentPathStr, pathStr), context.SS);
+ if (!CreateDirs(tx, parentPath, path, createdPaths, result)) {
+ return result;
+ }
}
}
}
@@ -1483,6 +1508,8 @@ TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx
case NKikimrSchemeOp::EOperationType::ESchemeOpDropBackupCollection:
return {CreateDropBackupCollection(NextPartId(), tx)};
+ case NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection:
+ return CreateBackupBackupCollection(NextPartId(), tx, context);
}
Y_UNREACHABLE();
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp
new file mode 100644
index 0000000000..77e25089ea
--- /dev/null
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_backup_backup_collection.cpp
@@ -0,0 +1,110 @@
+#include "schemeshard__operation_common.h"
+#include "schemeshard__operation_create_cdc_stream.h"
+#include "schemeshard_impl.h"
+
+#include <ydb/core/tx/schemeshard/backup/constants.h>
+
+namespace NKikimr::NSchemeShard {
+
+TVector<ISubOperation::TPtr> CreateBackupBackupCollection(TOperationId opId, const TTxTransaction& tx, TOperationContext& context) {
+ TVector<ISubOperation::TPtr> result;
+
+ NKikimrSchemeOp::TModifyScheme modifyScheme;
+ modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateConsistentCopyTables);
+ modifyScheme.SetInternal(true);
+
+ auto& cct = *modifyScheme.MutableCreateConsistentCopyTables();
+ auto& copyTables = *cct.MutableCopyTableDescriptions();
+ const auto workingDirPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);
+
+ TString bcPathStr = JoinPath({tx.GetWorkingDir(), tx.GetBackupBackupCollection().GetName()});
+
+ const TPath& bcPath = TPath::Resolve(bcPathStr, context.SS);
+ {
+ auto checks = bcPath.Check();
+ checks
+ .NotEmpty()
+ .NotUnderDomainUpgrade()
+ .IsAtLocalSchemeShard()
+ .IsResolved()
+ .NotUnderDeleting()
+ .NotUnderOperation()
+ .IsBackupCollection();
+
+ if (!checks) {
+ result = {CreateReject(opId, checks.GetStatus(), checks.GetError())};
+ return result;
+ }
+ }
+
+ Y_ABORT_UNLESS(context.SS->BackupCollections.contains(bcPath->PathId));
+ const auto& bc = context.SS->BackupCollections[bcPath->PathId];
+ bool incrBackupEnabled = bc->Description.HasIncrementalBackupConfig();
+
+ for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) {
+ auto& desc = *copyTables.Add();
+ desc.SetSrcPath(item.GetPath());
+ std::pair<TString, TString> paths;
+ TString err;
+ if (!TrySplitPathByDb(item.GetPath(), bcPath.GetDomainPathString(), paths, err)) {
+ result = {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, err)};
+ return {};
+ }
+ auto& relativeItemPath = paths.second;
+ desc.SetDstPath(JoinPath({tx.GetWorkingDir(), tx.GetBackupBackupCollection().GetName(), tx.GetBackupBackupCollection().GetTargetDir(), relativeItemPath}));
+ desc.SetOmitIndexes(true);
+ desc.SetOmitFollowers(true);
+ desc.SetAllowUnderSameOperation(true);
+
+ if (incrBackupEnabled) {
+ NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp;
+ createCdcStreamOp.SetTableName(item.GetPath());
+ auto& streamDescription = *createCdcStreamOp.MutableStreamDescription();
+ streamDescription.SetName(NBackup::CB_CDC_STREAM_NAME);
+ streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate);
+ streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto);
+
+ const auto sPath = TPath::Resolve(item.GetPath(), context.SS);
+ NCdc::DoCreateStreamImpl(result, createCdcStreamOp, opId, sPath, false, false);
+
+ desc.MutableCreateSrcCdcStream()->CopyFrom(createCdcStreamOp);
+ }
+ }
+
+ if (!CreateConsistentCopyTables(opId, modifyScheme, context, result)) {
+ return result;
+ }
+
+ if (incrBackupEnabled) {
+ for (const auto& item : bc->Description.GetExplicitEntryList().GetEntries()) {
+ NKikimrSchemeOp::TCreateCdcStream createCdcStreamOp;
+ createCdcStreamOp.SetTableName(item.GetPath());
+ auto& streamDescription = *createCdcStreamOp.MutableStreamDescription();
+ streamDescription.SetName(NBackup::CB_CDC_STREAM_NAME);
+ streamDescription.SetMode(NKikimrSchemeOp::ECdcStreamModeUpdate);
+ streamDescription.SetFormat(NKikimrSchemeOp::ECdcStreamFormatProto);
+
+ const auto sPath = TPath::Resolve(item.GetPath(), context.SS);
+ auto table = context.SS->Tables.at(sPath.Base()->PathId);
+
+ TVector<TString> boundaries;
+ const auto& partitions = table->GetPartitions();
+ boundaries.reserve(partitions.size() - 1);
+
+ for (ui32 i = 0; i < partitions.size(); ++i) {
+ const auto& partition = partitions.at(i);
+ if (i != partitions.size() - 1) {
+ boundaries.push_back(partition.EndOfRange);
+ }
+ }
+
+ const auto streamPath = sPath.Child(NBackup::CB_CDC_STREAM_NAME);
+
+ NCdc::DoCreatePqPart(result, createCdcStreamOp, opId, streamPath, NBackup::CB_CDC_STREAM_NAME, table, boundaries, false);
+ }
+ }
+
+ return result;
+}
+
+} // namespace NKikimr::NSchemeShard
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp
index c9b3a1ab0c..a50c0e82ea 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp
@@ -9,7 +9,7 @@
#include <util/generic/algorithm.h>
-NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst, bool omitFollowers, bool isBackup) {
+NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst, bool omitFollowers, bool isBackup, bool allowUnderSameOp) {
using namespace NKikimr::NSchemeShard;
auto scheme = TransactionTemplate(dst.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable);
@@ -20,6 +20,7 @@ NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src,
operation->SetCopyFromTable(src.PathString());
operation->SetOmitFollowers(omitFollowers);
operation->SetIsBackup(isBackup);
+ operation->SetAllowUnderSameOperation(allowUnderSameOp);
return scheme;
}
@@ -47,14 +48,20 @@ NKikimrSchemeOp::TModifyScheme CreateIndexTask(NKikimr::NSchemeShard::TTableInde
namespace NKikimr::NSchemeShard {
-TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, const TTxTransaction& tx, TOperationContext& context) {
+bool CreateConsistentCopyTables(
+ TOperationId nextId,
+ const TTxTransaction& tx,
+ TOperationContext& context,
+ TVector<ISubOperation::TPtr>& result)
+{
Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpCreateConsistentCopyTables);
const auto& op = tx.GetCreateConsistentCopyTables();
if (0 == op.CopyTableDescriptionsSize()) {
TString msg = TStringBuilder() << "no task to do, empty list CopyTableDescriptions";
- return {CreateReject(nextId, NKikimrScheme::EStatus::StatusInvalidParameter, msg)};
+ result = {CreateReject(nextId, NKikimrScheme::EStatus::StatusInvalidParameter, msg)};
+ return false;
}
TPath firstPath = TPath::Resolve(op.GetCopyTableDescriptions(0).GetSrcPath(), context.SS);
@@ -66,7 +73,8 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
.IsAtLocalSchemeShard();
if (!checks) {
- return {CreateReject(nextId, checks.GetStatus(), checks.GetError())};
+ result = {CreateReject(nextId, checks.GetStatus(), checks.GetError())};
+ return false;
}
}
@@ -80,20 +88,20 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
: limits.MaxConsistentCopyTargets;
if (op.CopyTableDescriptionsSize() > limit) {
- return {CreateReject(nextId, NKikimrScheme::EStatus::StatusInvalidParameter, TStringBuilder()
+ result = {CreateReject(nextId, NKikimrScheme::EStatus::StatusInvalidParameter, TStringBuilder()
<< "Consistent copy object count limit exceeded"
<< ", limit: " << limit
<< ", objects: " << op.CopyTableDescriptionsSize()
)};
+ return false;
}
TString errStr;
if (!context.SS->CheckApplyIf(tx, errStr)) {
- return {CreateReject(nextId, NKikimrScheme::EStatus::StatusPreconditionFailed, errStr)};
+ result = {CreateReject(nextId, NKikimrScheme::EStatus::StatusPreconditionFailed, errStr)};
+ return false;
}
- TVector<ISubOperation::TPtr> result;
-
for (const auto& descr: op.GetCopyTableDescriptions()) {
const auto& srcStr = descr.GetSrcPath();
const auto& dstStr = descr.GetDstPath();
@@ -108,7 +116,8 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
.IsTheSameDomain(firstPath);
if (!checks) {
- return {CreateReject(nextId, checks.GetStatus(), checks.GetError())};
+ result = {CreateReject(nextId, checks.GetStatus(), checks.GetError())};
+ return false;
}
}
@@ -135,7 +144,7 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
}
result.push_back(CreateCopyTable(NextPartId(nextId, result),
- CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup()), sequences));
+ CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup(), descr.GetAllowUnderSameOperation()), sequences));
TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
for (const auto& child: srcPath.Base()->GetChildren()) {
@@ -176,7 +185,7 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
TPath dstImplTable = dstIndexPath.Child(srcImplTableName);
result.push_back(CreateCopyTable(NextPartId(nextId, result),
- CopyTableTask(srcImplTable, dstImplTable, descr.GetOmitFollowers(), descr.GetIsBackup())));
+ CopyTableTask(srcImplTable, dstImplTable, descr.GetOmitFollowers(), descr.GetIsBackup(), descr.GetAllowUnderSameOperation())));
}
for (auto&& sequenceDescription : sequenceDescriptions) {
@@ -193,6 +202,14 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
}
}
+ return true;
+}
+
+TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, const TTxTransaction& tx, TOperationContext& context) {
+ TVector<ISubOperation::TPtr> result;
+
+ CreateConsistentCopyTables(nextId, tx, context, result);
+
return result;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp
index cadf213e63..41e9a5570a 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp
@@ -401,9 +401,12 @@ public:
.IsUnderTheSameOperation(OperationId.GetTxId()); //allow only as part of copying base table
} else {
checks
- .NotUnderOperation()
.IsCommonSensePath()
.IsLikeDirectory();
+
+ if (!Transaction.GetCreateTable().GetAllowUnderSameOperation()) {
+ checks.NotUnderOperation();
+ }
}
}
@@ -775,9 +778,12 @@ TVector<ISubOperation::TPtr> CreateCopyTable(TOperationId nextId, const TTxTrans
.NotDeleted()
.NotUnderDeleting()
.IsTable()
- .NotUnderOperation()
.IsCommonSensePath(); //forbid copy impl index tables directly
+ if (!copying.GetAllowUnderSameOperation()) {
+ checks.NotUnderOperation();
+ }
+
if (!checks) {
return {CreateReject(nextId, checks.GetStatus(), checks.GetError())};
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 39edfcb062..7c9b8b67c3 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -712,6 +712,19 @@ static void FillModifySchemaForCdc(
}
}
+void DoCreateStreamImpl(
+ TVector<ISubOperation::TPtr>& result,
+ const NKikimrSchemeOp::TCreateCdcStream& op,
+ const TOperationId& opId,
+ const TPath& tablePath,
+ const bool acceptExisted,
+ const bool initialScan)
+{
+ auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl);
+ FillModifySchemaForCdc(outTx, op, opId, acceptExisted, initialScan);
+ result.push_back(CreateNewCdcStreamImpl(NextPartId(opId, result), outTx));
+}
+
void DoCreateStream(
TVector<ISubOperation::TPtr>& result,
const NKikimrSchemeOp::TCreateCdcStream& op,
@@ -721,11 +734,8 @@ void DoCreateStream(
const bool acceptExisted,
const bool initialScan)
{
- {
- auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl);
- FillModifySchemaForCdc(outTx, op, opId, acceptExisted, initialScan);
- result.push_back(CreateNewCdcStreamImpl(NextPartId(opId, result), outTx));
- }
+ DoCreateStreamImpl(result, op, opId, tablePath, acceptExisted, initialScan);
+
{
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable);
FillModifySchemaForCdc(outTx, op, opId, acceptExisted, initialScan);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h
index 1aa03529be..4ba199de6c 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.h
@@ -22,6 +22,14 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoNewStreamPathChecks(
bool acceptExisted,
bool restore = false);
+void DoCreateStreamImpl(
+ TVector<ISubOperation::TPtr>& result,
+ const NKikimrSchemeOp::TCreateCdcStream& op,
+ const TOperationId& opId,
+ const TPath& tablePath,
+ const bool acceptExisted,
+ const bool initialScan);
+
void DoCreateStream(
TVector<ISubOperation::TPtr>& result,
const NKikimrSchemeOp::TCreateCdcStream& op,
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
index b784ef579b..4b5cfe312e 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h
@@ -459,6 +459,11 @@ ISubOperation::TPtr CreateDropTableIndex(TOperationId id, TTxState::ETxState sta
ISubOperation::TPtr CreateAlterTableIndex(TOperationId id, const TTxTransaction& tx);
ISubOperation::TPtr CreateAlterTableIndex(TOperationId id, TTxState::ETxState state);
+bool CreateConsistentCopyTables(
+ TOperationId nextId,
+ const TTxTransaction& tx,
+ TOperationContext& context,
+ TVector<ISubOperation::TPtr>& result);
TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, const TTxTransaction& tx, TOperationContext& context);
ISubOperation::TPtr CreateNewOlapStore(TOperationId id, const TTxTransaction& tx);
@@ -655,6 +660,7 @@ ISubOperation::TPtr CreateNewBackupCollection(TOperationId id, TTxState::ETxStat
ISubOperation::TPtr CreateDropBackupCollection(TOperationId id, const TTxTransaction& tx);
ISubOperation::TPtr CreateDropBackupCollection(TOperationId id, TTxState::ETxState state);
+TVector<ISubOperation::TPtr> CreateBackupBackupCollection(TOperationId opId, const TTxTransaction& tx, TOperationContext& context);
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp
index 5a4cd7fe80..eb091e78d9 100644
--- a/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp
@@ -255,6 +255,9 @@ TString DefineUserOperationName(const NKikimrSchemeOp::TModifyScheme& tx) {
return "ALTER BACKUP COLLECTION";
case NKikimrSchemeOp::EOperationType::ESchemeOpDropBackupCollection:
return "DROP BACKUP COLLECTION";
+
+ case NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection:
+ return "BACKUP";
}
Y_ABORT("switch should cover all operation types");
}
@@ -576,6 +579,10 @@ TVector<TString> ExtractChangingPaths(const NKikimrSchemeOp::TModifyScheme& tx)
case NKikimrSchemeOp::EOperationType::ESchemeOpDropBackupCollection:
result.emplace_back(NKikimr::JoinPath({tx.GetWorkingDir(), tx.GetDropBackupCollection().GetName()}));
break;
+
+ case NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection:
+ result.emplace_back(NKikimr::JoinPath({tx.GetWorkingDir(), tx.GetBackupBackupCollection().GetName()}));
+ break;
}
return result;
diff --git a/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp b/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp
index fd6d3e7b9d..bb772d5236 100644
--- a/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp
+++ b/ydb/core/tx/schemeshard/ut_backup_collection/ut_backup_collection.cpp
@@ -288,4 +288,58 @@ Y_UNIT_TEST_SUITE(TBackupCollectionTests) {
)");
env.TestWaitNotification(runtime, txId);
}
+
+ Y_UNIT_TEST(BackupAbsentCollection) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true));
+ ui64 txId = 100;
+
+ SetupLogging(runtime);
+
+ PrepareDirs(runtime, env, txId);
+
+ TestBackupBackupCollection(runtime, ++txId, "/MyRoot",
+ R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")",
+ {NKikimrScheme::EStatus::StatusPathDoesNotExist});
+ env.TestWaitNotification(runtime, txId);
+ }
+
+ Y_UNIT_TEST(BackupDroppedCollection) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true));
+ ui64 txId = 100;
+
+ SetupLogging(runtime);
+
+ PrepareDirs(runtime, env, txId);
+
+ TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections", DefaultCollectionSettings());
+ env.TestWaitNotification(runtime, txId);
+
+ TestDropBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections", "Name: \"" DEFAULT_NAME_1 "\"");
+ env.TestWaitNotification(runtime, txId);
+
+ TestBackupBackupCollection(runtime, ++txId, "/MyRoot",
+ R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")",
+ {NKikimrScheme::EStatus::StatusPathDoesNotExist});
+ env.TestWaitNotification(runtime, txId);
+ }
+
+ Y_UNIT_TEST(BackupAbsentDirs) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true));
+ ui64 txId = 100;
+
+ SetupLogging(runtime);
+
+ PrepareDirs(runtime, env, txId);
+
+ TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections", DefaultCollectionSettings());
+ env.TestWaitNotification(runtime, txId);
+
+ TestBackupBackupCollection(runtime, ++txId, "/MyRoot",
+ R"(Name: ".backups/collections/)" DEFAULT_NAME_1 R"(")",
+ {NKikimrScheme::EStatus::StatusPathDoesNotExist});
+ env.TestWaitNotification(runtime, txId);
+ }
} // TBackupCollectionTests
diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp
index 5228885af3..170d5f3c29 100644
--- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp
+++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp
@@ -11376,4 +11376,138 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
TestCopyTable(runtime, ++txId, "/MyRoot", "SystemColumnInCopyAllowed", "/MyRoot/SystemColumnAllowed");
}
+
+ Y_UNIT_TEST_FLAG(BackupBackupCollection, WithIncremental) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableBackupService(true));
+ ui64 txId = 100;
+
+ auto defaultCollectionSettings = []() {
+ return TString(R"(
+ Name: "MyCollection1"
+ ExplicitEntryList {
+ Entries {
+ Type: ETypeTable
+ Path: "/MyRoot/Table1"
+ }
+ Entries {
+ Type: ETypeTable
+ Path: "/MyRoot/DirA/Table2"
+ }
+ Entries {
+ Type: ETypeTable
+ Path: "/MyRoot/DirA/DirB/Table3"
+ }
+ }
+ Cluster {}
+ )") + (WithIncremental ? TString("IncrementalBackupConfig {} \n") : TString());
+ };
+
+ AsyncMkDir(runtime, ++txId, "/MyRoot", "DirA");
+ AsyncMkDir(runtime, ++txId, "/MyRoot/DirA", "DirB");
+ AsyncCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table1"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value0" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )");
+ AsyncCreateIndexedTable(runtime, ++txId, "/MyRoot/DirA", R"(
+ TableDescription {
+ Name: "Table2"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value0" Type: "Utf8" }
+ Columns { Name: "value1" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ }
+ IndexDescription {
+ Name: "UserDefinedIndexByValue0"
+ KeyColumnNames: ["value0"]
+ }
+ IndexDescription {
+ Name: "UserDefinedIndexByValues"
+ KeyColumnNames: ["value0", "value1"]
+ }
+ IndexDescription {
+ Name: "UserDefinedIndexByValue0CoveringValue1"
+ KeyColumnNames: ["value0"]
+ DataColumnNames: ["value1"]
+ }
+ )");
+ AsyncCreateTable(runtime, ++txId, "/MyRoot/DirA/DirB", R"(
+ Name: "Table3"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value0" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )");
+
+ TestModificationResult(runtime, txId - 4, NKikimrScheme::StatusAccepted);
+ TestModificationResult(runtime, txId - 3, NKikimrScheme::StatusAccepted);
+ TestModificationResult(runtime, txId - 2, NKikimrScheme::StatusAccepted);
+ TestModificationResult(runtime, txId - 1, NKikimrScheme::StatusAccepted);
+ TestModificationResult(runtime, txId - 0, NKikimrScheme::StatusAccepted);
+
+ env.TestWaitNotification(runtime, {txId, txId - 1 , txId - 2, txId - 3, txId - 4});
+
+ TestMkDir(runtime, ++txId, "/MyRoot", ".backups/collections");
+ TestCreateBackupCollection(runtime, ++txId, "/MyRoot/.backups/collections", defaultCollectionSettings());
+
+ env.TestWaitNotification(runtime, {txId, txId - 1});
+
+ TestBackupBackupCollection(runtime, ++txId, "/MyRoot", R"(
+ Name: ".backups/collections/MyCollection1"
+ )");
+
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/.backups/collections/MyCollection1"), {
+ NLs::PathExist,
+ NLs::IsBackupCollection,
+ NLs::ChildrenCount(1),
+ NLs::Finished,
+ });
+
+ auto descr = DescribePath(runtime, "/MyRoot/.backups/collections/MyCollection1").GetPathDescription();
+ UNIT_ASSERT_VALUES_EQUAL(descr.GetChildren().size(), 1);
+
+ auto backupDirName = descr.GetChildren(0).GetName().c_str();
+
+ TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s", backupDirName)), {
+ NLs::PathExist,
+ NLs::ChildrenCount(2),
+ NLs::Finished,
+ });
+
+ TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/Table1", backupDirName)), {
+ NLs::PathExist,
+ NLs::IsTable,
+ NLs::Finished,
+ });
+
+ TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/DirA", backupDirName)), {
+ NLs::PathExist,
+ NLs::ChildrenCount(2),
+ NLs::Finished,
+ });
+
+ TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/DirA/Table2", backupDirName)), {
+ NLs::PathExist,
+ NLs::IsTable,
+ NLs::Finished,
+ });
+
+ TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/DirA/DirB", backupDirName)), {
+ NLs::PathExist,
+ NLs::ChildrenCount(1),
+ NLs::Finished,
+ });
+
+ TestDescribeResult(DescribePath(runtime, Sprintf("/MyRoot/.backups/collections/MyCollection1/%s/DirA/DirB/Table3", backupDirName)), {
+ NLs::PathExist,
+ NLs::IsTable,
+ NLs::Finished,
+ });
+
+ // TODO: validate no index created
+ // TODO: validate no stream created
+ }
}
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
index c525a0f677..1000d3f109 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
@@ -924,6 +924,7 @@ namespace NSchemeShardUT_Private {
GENERIC_HELPERS(CreateBackupCollection, NKikimrSchemeOp::EOperationType::ESchemeOpCreateBackupCollection, &NKikimrSchemeOp::TModifyScheme::MutableCreateBackupCollection)
GENERIC_HELPERS(DropBackupCollection, NKikimrSchemeOp::EOperationType::ESchemeOpDropBackupCollection, &NKikimrSchemeOp::TModifyScheme::MutableDropBackupCollection)
DROP_BY_PATH_ID_HELPERS(DropBackupCollection, NKikimrSchemeOp::EOperationType::ESchemeOpDropBackupCollection)
+ GENERIC_HELPERS(BackupBackupCollection, NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection, &NKikimrSchemeOp::TModifyScheme::MutableBackupBackupCollection)
#undef DROP_BY_PATH_ID_HELPERS
#undef GENERIC_WITH_ATTRS_HELPERS
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
index 2378346fbd..f462cf9872 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
@@ -299,6 +299,7 @@ namespace NSchemeShardUT_Private {
GENERIC_HELPERS(CreateBackupCollection);
GENERIC_HELPERS(DropBackupCollection);
DROP_BY_PATH_ID_HELPERS(DropBackupCollection);
+ GENERIC_HELPERS(BackupBackupCollection);
#undef DROP_BY_PATH_ID_HELPERS
#undef GENERIC_WITH_ATTRS_HELPERS
diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make
index 337d4ea650..7b44b323a3 100644
--- a/ydb/core/tx/schemeshard/ya.make
+++ b/ydb/core/tx/schemeshard/ya.make
@@ -61,10 +61,10 @@ SRCS(
operation_queue_timer.h
schemeshard.cpp
schemeshard__background_cleaning.cpp
+ schemeshard__background_compaction.cpp
schemeshard__backup_collection_common.cpp
schemeshard__borrowed_compaction.cpp
schemeshard__clean_pathes.cpp
- schemeshard__background_compaction.cpp
schemeshard__conditional_erase.cpp
schemeshard__delete_tablet_reply.cpp
schemeshard__describe_scheme.cpp
@@ -100,19 +100,20 @@ SRCS(
schemeshard__operation_alter_user_attrs.cpp
schemeshard__operation_apply_build_index.cpp
schemeshard__operation_assign_bsv.cpp
+ schemeshard__operation_backup_backup_collection.cpp
schemeshard__operation_blob_depot.cpp
schemeshard__operation_cancel_tx.cpp
schemeshard__operation_cansel_build_index.cpp
- schemeshard__operation_common.h
schemeshard__operation_common.cpp
- schemeshard__operation_common_pq.cpp
+ schemeshard__operation_common.h
schemeshard__operation_common_bsv.cpp
schemeshard__operation_common_cdc_stream.cpp
schemeshard__operation_common_external_data_source.cpp
schemeshard__operation_common_external_table.cpp
+ schemeshard__operation_common_pq.cpp
schemeshard__operation_common_resource_pool.cpp
- schemeshard__operation_common_subdomain.h
schemeshard__operation_common_subdomain.cpp
+ schemeshard__operation_common_subdomain.h
schemeshard__operation_consistent_copy_tables.cpp
schemeshard__operation_copy_sequence.cpp
schemeshard__operation_copy_table.cpp
diff --git a/ydb/core/tx/tx_proxy/schemereq.cpp b/ydb/core/tx/tx_proxy/schemereq.cpp
index b05f88f91d..12fe027d2d 100644
--- a/ydb/core/tx/tx_proxy/schemereq.cpp
+++ b/ydb/core/tx/tx_proxy/schemereq.cpp
@@ -374,6 +374,9 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> {
case NKikimrSchemeOp::ESchemeOpDropBackupCollection:
return *modifyScheme.MutableDropBackupCollection()->MutableName();
+
+ case NKikimrSchemeOp::ESchemeOpBackupBackupCollection:
+ return *modifyScheme.MutableBackupBackupCollection()->MutableName();
}
}
@@ -749,6 +752,15 @@ struct TBaseSchemeReq: public TActorBootstrapped<TDerived> {
}
break;
}
+ case NKikimrSchemeOp::ESchemeOpBackupBackupCollection: {
+ auto toResolve = TPathToResolve(pbModifyScheme.GetOperationType());
+ toResolve.Path = workingDir;
+ auto collectionPath = SplitPath(pbModifyScheme.GetBackupBackupCollection().GetName());
+ std::move(collectionPath.begin(), collectionPath.end(), std::back_inserter(toResolve.Path));
+ toResolve.RequiredAccess = NACLib::EAccessRights::GenericWrite;
+ ResolveForACL.push_back(toResolve);
+ break;
+ }
case NKikimrSchemeOp::ESchemeOpMoveTable: {
auto& descr = pbModifyScheme.GetMoveTable();
{