aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorInnokentii Mokin <innokentii@ydb.tech>2024-12-04 12:42:02 +0300
committerGitHub <noreply@github.com>2024-12-04 12:42:02 +0300
commit42c202536b8cc47fd79dc8d1836e183942368fc7 (patch)
tree21113ef99820cf8db8fb11099369c92d3f46d05b
parentd215bee2133240c360a5b745e3c41d8698e9bb57 (diff)
downloadydb-42c202536b8cc47fd79dc8d1836e183942368fc7.tar.gz
Fix incremental backup cdc creation (#12267)
-rw-r--r--ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp73
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp23
2 files changed, 87 insertions, 9 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp
index 9744a9c0ac..943853664e 100644
--- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp
@@ -359,6 +359,77 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
)"));
}
-} // Cdc
+ Y_UNIT_TEST_TWIN(SimpleBackupBackupCollection, WithIncremental) {
+ TPortManager portManager;
+ TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(false)
+ .SetDomainName("Root")
+ .SetEnableChangefeedInitialScan(true)
+ .SetEnableBackupService(true)
+ );
+
+ auto& runtime = *server->GetRuntime();
+ const auto edgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(server, edgeActor);
+ CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
+
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10)
+ , (2, 20)
+ , (3, 30)
+ ;
+ )");
+
+ ExecSQL(server, edgeActor, R"(
+ CREATE BACKUP COLLECTION `MyCollection`
+ ( TABLE `/Root/Table`
+ )
+ WITH
+ ( STORAGE = 'cluster'
+ , INCREMENTAL_BACKUP_ENABLED = ')" + TString(WithIncremental ? "true" : "false") + R"('
+ );
+ )", false);
+
+ ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false);
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ -- TODO: fix with navigate after proper scheme cache handling
+ SELECT key, value FROM `/Root/.backups/collections/MyCollection/19700101000001Z_full/Table`
+ ORDER BY key
+ )"),
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value FROM `/Root/Table`
+ ORDER BY key
+ )"));
+
+
+ if (WithIncremental) {
+ ExecSQL(server, edgeActor, R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (2, 200);
+ )");
+
+ ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=1;)");
+
+ ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false);
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ -- TODO: fix with navigate after proper scheme cache handling
+ SELECT key, value FROM `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/Table`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { null_flag_value: NULL_VALUE } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 200 } }");
+ }
+ }
+
+} // Y_UNIT_TEST_SUITE(IncrementalBackup)
} // NKikimr
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp
index f8ee48da85..eead561dc0 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp
@@ -9,7 +9,7 @@
#include <util/generic/algorithm.h>
-NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst, bool omitFollowers, bool isBackup, bool allowUnderSameOp) {
+NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst, const NKikimrSchemeOp::TCopyTableConfig& descr) {
using namespace NKikimr::NSchemeShard;
auto scheme = TransactionTemplate(dst.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable);
@@ -18,9 +18,13 @@ NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src,
auto operation = scheme.MutableCreateTable();
operation->SetName(dst.LeafName());
operation->SetCopyFromTable(src.PathString());
- operation->SetOmitFollowers(omitFollowers);
- operation->SetIsBackup(isBackup);
- operation->SetAllowUnderSameOperation(allowUnderSameOp);
+ operation->SetOmitFollowers(descr.GetOmitFollowers());
+ operation->SetIsBackup(descr.GetIsBackup());
+ operation->SetAllowUnderSameOperation(descr.GetAllowUnderSameOperation());
+ if (descr.HasCreateSrcCdcStream()) {
+ auto* coOp = scheme.MutableCreateCdcStream();
+ coOp->CopyFrom(descr.GetCreateSrcCdcStream());
+ }
return scheme;
}
@@ -144,8 +148,10 @@ bool CreateConsistentCopyTables(
sequences.emplace(sequenceName);
}
- result.push_back(CreateCopyTable(NextPartId(nextId, result),
- CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup(), descr.GetAllowUnderSameOperation()), sequences));
+ result.push_back(CreateCopyTable(
+ NextPartId(nextId, result),
+ CopyTableTask(srcPath, dstPath, descr),
+ sequences));
TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
for (const auto& child: srcPath.Base()->GetChildren()) {
@@ -190,8 +196,9 @@ bool CreateConsistentCopyTables(
Y_ABORT_UNLESS(srcImplTable.Base()->PathId == srcIndexPath.Base()->GetChildren().begin()->second);
TPath dstImplTable = dstIndexPath.Child(srcImplTableName);
- result.push_back(CreateCopyTable(NextPartId(nextId, result),
- CopyTableTask(srcImplTable, dstImplTable, descr.GetOmitFollowers(), descr.GetIsBackup(), descr.GetAllowUnderSameOperation())));
+ result.push_back(CreateCopyTable(
+ NextPartId(nextId, result),
+ CopyTableTask(srcImplTable, dstImplTable, descr)));
}
for (auto&& sequenceDescription : sequenceDescriptions) {