diff options
author | Innokentii Mokin <innokentii@ydb.tech> | 2024-12-04 12:42:02 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-04 12:42:02 +0300 |
commit | 42c202536b8cc47fd79dc8d1836e183942368fc7 (patch) | |
tree | 21113ef99820cf8db8fb11099369c92d3f46d05b | |
parent | d215bee2133240c360a5b745e3c41d8698e9bb57 (diff) | |
download | ydb-42c202536b8cc47fd79dc8d1836e183942368fc7.tar.gz |
Fix incremental backup cdc creation (#12267)
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp | 73 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp | 23 |
2 files changed, 87 insertions, 9 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp index 9744a9c0ac..943853664e 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp @@ -359,6 +359,77 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) { )")); } -} // Cdc + Y_UNIT_TEST_TWIN(SimpleBackupBackupCollection, WithIncremental) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableChangefeedInitialScan(true) + .SetEnableBackupService(true) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10) + , (2, 20) + , (3, 30) + ; + )"); + + ExecSQL(server, edgeActor, R"( + CREATE BACKUP COLLECTION `MyCollection` + ( TABLE `/Root/Table` + ) + WITH + ( STORAGE = 'cluster' + , INCREMENTAL_BACKUP_ENABLED = ')" + TString(WithIncremental ? "true" : "false") + R"(' + ); + )", false); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection`;)", false); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + -- TODO: fix with navigate after proper scheme cache handling + SELECT key, value FROM `/Root/.backups/collections/MyCollection/19700101000001Z_full/Table` + ORDER BY key + )"), + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/Table` + ORDER BY key + )")); + + + if (WithIncremental) { + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (2, 200); + )"); + + ExecSQL(server, edgeActor, R"(DELETE FROM `/Root/Table` WHERE key=1;)"); + + ExecSQL(server, edgeActor, R"(BACKUP `MyCollection` INCREMENTAL;)", false); + + SimulateSleep(server, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + -- TODO: fix with navigate after proper scheme cache handling + SELECT key, value FROM `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/Table` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { null_flag_value: NULL_VALUE } }, " + "{ items { uint32_value: 2 } items { uint32_value: 200 } }"); + } + } + +} // Y_UNIT_TEST_SUITE(IncrementalBackup) } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp index f8ee48da85..eead561dc0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_consistent_copy_tables.cpp @@ -9,7 +9,7 @@ #include <util/generic/algorithm.h> -NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst, bool omitFollowers, bool isBackup, bool allowUnderSameOp) { +NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, NKikimr::NSchemeShard::TPath& dst, const NKikimrSchemeOp::TCopyTableConfig& descr) { using namespace NKikimr::NSchemeShard; auto scheme = TransactionTemplate(dst.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable); @@ -18,9 +18,13 @@ NKikimrSchemeOp::TModifyScheme CopyTableTask(NKikimr::NSchemeShard::TPath& src, auto operation = scheme.MutableCreateTable(); operation->SetName(dst.LeafName()); operation->SetCopyFromTable(src.PathString()); - operation->SetOmitFollowers(omitFollowers); - operation->SetIsBackup(isBackup); - operation->SetAllowUnderSameOperation(allowUnderSameOp); + operation->SetOmitFollowers(descr.GetOmitFollowers()); + operation->SetIsBackup(descr.GetIsBackup()); + operation->SetAllowUnderSameOperation(descr.GetAllowUnderSameOperation()); + if (descr.HasCreateSrcCdcStream()) { + auto* coOp = scheme.MutableCreateCdcStream(); + coOp->CopyFrom(descr.GetCreateSrcCdcStream()); + } return scheme; } @@ -144,8 +148,10 @@ bool CreateConsistentCopyTables( sequences.emplace(sequenceName); } - result.push_back(CreateCopyTable(NextPartId(nextId, result), - CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup(), descr.GetAllowUnderSameOperation()), sequences)); + result.push_back(CreateCopyTable( + NextPartId(nextId, result), + CopyTableTask(srcPath, dstPath, descr), + sequences)); TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions; for (const auto& child: srcPath.Base()->GetChildren()) { @@ -190,8 +196,9 @@ bool CreateConsistentCopyTables( Y_ABORT_UNLESS(srcImplTable.Base()->PathId == srcIndexPath.Base()->GetChildren().begin()->second); TPath dstImplTable = dstIndexPath.Child(srcImplTableName); - result.push_back(CreateCopyTable(NextPartId(nextId, result), - CopyTableTask(srcImplTable, dstImplTable, descr.GetOmitFollowers(), descr.GetIsBackup(), descr.GetAllowUnderSameOperation()))); + result.push_back(CreateCopyTable( + NextPartId(nextId, result), + CopyTableTask(srcImplTable, dstImplTable, descr))); } for (auto&& sequenceDescription : sequenceDescriptions) { |