diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-06-06 14:51:41 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-06-06 14:51:41 +0300 |
commit | f3748b98db7af1f1fd7e345bb6e8d452387d3366 (patch) | |
tree | 09f04609360efb6dc13bfb7269a04abf2a069ffb | |
parent | 94a5b9b2d4bb884f5a964c47268f2821a04a334c (diff) | |
download | ydb-f3748b98db7af1f1fd7e345bb6e8d452387d3366.tar.gz |
22-2: Additional tests KIKIMR-13698
merge from trunk: r9376681, r9381053, r9385261
REVIEW: 2503366
x-ydb-stable-ref: 21ac0cb179e5c675610d7931a5979edbbb12e072
-rw-r--r-- | ydb/core/tx/datashard/change_collector_cdc_stream.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.cpp | 25 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp | 22 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_cdc_stream.cpp | 68 |
6 files changed, 137 insertions, 12 deletions
diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp index 24d533a52a..688b32b23e 100644 --- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp @@ -62,17 +62,21 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const { return *CachedNeedToReadKeys; } + bool value = false; for (const auto& [_, tableInfo] : Self->GetUserTables()) { for (const auto& [_, streamInfo] : tableInfo->CdcStreams) { + if (streamInfo.State == NKikimrSchemeOp::ECdcStreamStateDisabled) { + continue; + } + switch (streamInfo.Mode) { case NKikimrSchemeOp::ECdcStreamModeKeysOnly: case NKikimrSchemeOp::ECdcStreamModeUpdate: - CachedNeedToReadKeys = false; break; case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeOldImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: - CachedNeedToReadKeys = true; + value = true; break; default: Y_FAIL_S("Invalid stream mode: " << static_cast<ui32>(streamInfo.Mode)); @@ -80,7 +84,7 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const { } } - Y_VERIFY(CachedNeedToReadKeys); + CachedNeedToReadKeys = value; return *CachedNeedToReadKeys; } @@ -113,6 +117,10 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, bool read = false; for (const auto& [pathId, stream] : userTable->CdcStreams) { + if (stream.State == NKikimrSchemeOp::ECdcStreamStateDisabled) { + continue; + } + switch (stream.Mode) { case NKikimrSchemeOp::ECdcStreamModeKeysOnly: Persist(tableId, pathId, rop, key, keyTags, {}); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 8ae06589eb..9a2d721607 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1347,6 +1347,20 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(DisableStream) { + auto action = [](TServer::TPtr server) { + return AsyncAlterDisableStream(server, "/Root", "Table", "Stream"); + }; + + ShouldDeliverChanges(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, { + R"(UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);)", + }, { + R"(UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);)", + }, { + R"({"update":{"value":10},"key":[1]})", + }); + } + // Split/merge Y_UNIT_TEST(ShouldDeliverChangesOnSplitMerge) { TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false); diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index 7c2b78d6a6..bb6d0af58b 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1704,6 +1704,31 @@ ui64 AsyncAlterAddStream( return ev->Get()->Record.GetTxId(); } +ui64 AsyncAlterDisableStream( + Tests::TServer::TPtr server, + const TString& workingDir, + const TString& tableName, + const TString& streamName) +{ + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); + request->Record.SetExecTimeoutPeriod(Max<ui64>()); + auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterCdcStream); + tx.SetWorkingDir(workingDir); + auto &desc = *tx.MutableAlterCdcStream(); + desc.SetTableName(tableName); + desc.SetStreamName(streamName); + desc.MutableDisable(); + + runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); + auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); + return ev->Get()->Record.GetTxId(); +} + ui64 AsyncAlterDropStream( Tests::TServer::TPtr server, const TString& workingDir, diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 144f75526c..972d0530b8 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -586,6 +586,12 @@ ui64 AsyncAlterAddStream( const TString& tableName, const TShardedTableOptions::TCdcStream& streamDesc); +ui64 AsyncAlterDisableStream( + Tests::TServer::TPtr server, + const TString& workingDir, + const TString& tableName, + const TString& streamName); + ui64 AsyncAlterDropStream( Tests::TServer::TPtr server, const TString& workingDir, diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp index 8403511636..3e287ba3c1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp @@ -97,27 +97,31 @@ TVector<ISubOperationBase::TPtr> CreateConsistentMoveTable(TOperationId nextId, for (auto& child: srcPath.Base()->GetChildren()) { auto name = child.first; - TPath srcIndexPath = srcPath.Child(name); - if (srcIndexPath.IsDeleted()) { + TPath srcChildPath = srcPath.Child(name); + if (srcChildPath.IsDeleted()) { continue; } + if (srcChildPath.IsCdcStream()) { + return {CreateReject(nextId, NKikimrScheme::StatusPreconditionFailed, "Cannot move table with cdc streams")}; + } + TPath dstIndexPath = dstPath.Child(name); - Y_VERIFY(srcIndexPath.Base()->PathId == child.second); - Y_VERIFY_S(srcIndexPath.Base()->GetChildren().size() == 1, - srcIndexPath.PathString() << " has children " << srcIndexPath.Base()->GetChildren().size()); + Y_VERIFY(srcChildPath.Base()->PathId == child.second); + Y_VERIFY_S(srcChildPath.Base()->GetChildren().size() == 1, + srcChildPath.PathString() << " has children " << srcChildPath.Base()->GetChildren().size()); result.push_back(CreateMoveTableIndex(TOperationId(nextId.GetTxId(), nextId.GetSubTxId() + result.size()), - MoveTableIndexTask(srcIndexPath, dstIndexPath))); + MoveTableIndexTask(srcChildPath, dstIndexPath))); - TString srcImplTableName = srcIndexPath.Base()->GetChildren().begin()->first; - TPath srcImplTable = srcIndexPath.Child(srcImplTableName); + TString srcImplTableName = srcChildPath.Base()->GetChildren().begin()->first; + TPath srcImplTable = srcChildPath.Child(srcImplTableName); if (srcImplTable.IsDeleted()) { continue; } - Y_VERIFY(srcImplTable.Base()->PathId == srcIndexPath.Base()->GetChildren().begin()->second); + Y_VERIFY(srcImplTable.Base()->PathId == srcChildPath.Base()->GetChildren().begin()->second); TPath dstImplTable = dstIndexPath.Child(srcImplTableName); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index aedf7a458e..a678d77ab9 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -397,4 +397,72 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { TestDropPQGroup(runtime, ++txId, "/MyRoot/Table/Stream", "streamImpl", {NKikimrScheme::StatusNameConflict}); } + Y_UNIT_TEST(CopyTableShouldNotCopyStream) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {NLs::PathExist}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream/streamImpl"), {NLs::PathExist}); + + TestCopyTable(runtime, ++txId, "/MyRoot", "TableCopy", "/MyRoot/Table"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/TableCopy/Stream"), {NLs::PathNotExist}); + + TestConsistentCopyTables(runtime, ++txId, "/", R"( + CopyTableDescriptions { + SrcPath: "/MyRoot/Table" + DstPath: "/MyRoot/TableConsistentCopy" + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/TableCopy/Stream"), {NLs::PathNotExist}); + } + + Y_UNIT_TEST(MoveTableShouldFail) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + env.TestWaitNotification(runtime, txId); + + TestMoveTable(runtime, ++txId, "/MyRoot/Table", "/MyRoot/TableMoved", {NKikimrScheme::StatusPreconditionFailed}); + } + } // TCdcStreamTests |