diff options
| author | Ilnaz Nizametdinov <[email protected]> | 2022-04-22 13:39:11 +0300 |
|---|---|---|
| committer | Ilnaz Nizametdinov <[email protected]> | 2022-04-22 13:39:11 +0300 |
| commit | 7929fa7dc1e20fd956f4b2ac825285683c4b94ed (patch) | |
| tree | 2ddc2b21251a9bbe25f6f3e9f6cf36be15c9672e | |
| parent | a482a42aab20bf1a3bc9ce75fd16c419d2727aee (diff) | |
Shoud not generate change records for disabled stream KIKIMR-13698
ref:8152875877ddc45cb11d341bc3ce33a47bbcefe2
| -rw-r--r-- | ydb/core/tx/datashard/change_collector_cdc_stream.cpp | 14 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 14 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.cpp | 25 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.h | 6 |
4 files changed, 56 insertions, 3 deletions
diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp index 24d533a52a3..688b32b23e7 100644 --- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp @@ -62,17 +62,21 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const { return *CachedNeedToReadKeys; } + bool value = false; for (const auto& [_, tableInfo] : Self->GetUserTables()) { for (const auto& [_, streamInfo] : tableInfo->CdcStreams) { + if (streamInfo.State == NKikimrSchemeOp::ECdcStreamStateDisabled) { + continue; + } + switch (streamInfo.Mode) { case NKikimrSchemeOp::ECdcStreamModeKeysOnly: case NKikimrSchemeOp::ECdcStreamModeUpdate: - CachedNeedToReadKeys = false; break; case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeOldImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: - CachedNeedToReadKeys = true; + value = true; break; default: Y_FAIL_S("Invalid stream mode: " << static_cast<ui32>(streamInfo.Mode)); @@ -80,7 +84,7 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const { } } - Y_VERIFY(CachedNeedToReadKeys); + CachedNeedToReadKeys = value; return *CachedNeedToReadKeys; } @@ -113,6 +117,10 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, bool read = false; for (const auto& [pathId, stream] : userTable->CdcStreams) { + if (stream.State == NKikimrSchemeOp::ECdcStreamStateDisabled) { + continue; + } + switch (stream.Mode) { case NKikimrSchemeOp::ECdcStreamModeKeysOnly: Persist(tableId, pathId, rop, key, keyTags, {}); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 8ae06589ebe..9a2d721607d 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1347,6 +1347,20 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(DisableStream) { + auto action = [](TServer::TPtr server) { + return AsyncAlterDisableStream(server, "/Root", "Table", "Stream"); + }; + + ShouldDeliverChanges(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, { + R"(UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);)", + }, { + R"(UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);)", + }, { + R"({"update":{"value":10},"key":[1]})", + }); + } + // Split/merge Y_UNIT_TEST(ShouldDeliverChangesOnSplitMerge) { TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false); diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index 3d334951406..e83a657ad8a 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1704,6 +1704,31 @@ ui64 AsyncAlterAddStream( return ev->Get()->Record.GetTxId(); } +ui64 AsyncAlterDisableStream( + Tests::TServer::TPtr server, + const TString& workingDir, + const TString& tableName, + const TString& streamName) +{ + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); + request->Record.SetExecTimeoutPeriod(Max<ui64>()); + auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterCdcStream); + tx.SetWorkingDir(workingDir); + auto &desc = *tx.MutableAlterCdcStream(); + desc.SetTableName(tableName); + desc.SetStreamName(streamName); + desc.MutableDisable(); + + runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); + auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); + return ev->Get()->Record.GetTxId(); +} + ui64 AsyncAlterDropStream( Tests::TServer::TPtr server, const TString& workingDir, diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 144f75526c6..972d0530b83 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -586,6 +586,12 @@ ui64 AsyncAlterAddStream( const TString& tableName, const TShardedTableOptions::TCdcStream& streamDesc); +ui64 AsyncAlterDisableStream( + Tests::TServer::TPtr server, + const TString& workingDir, + const TString& tableName, + const TString& streamName); + ui64 AsyncAlterDropStream( Tests::TServer::TPtr server, const TString& workingDir, |
