diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-07-17 19:09:25 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-07-17 19:09:25 +0300 |
commit | 05482e640fc81d547fb7607d0e97ab6cdb4233a0 (patch) | |
tree | d9e52a95b14899179c6d25f247eeb14ab7ab1cd5 | |
parent | f3c0e9b4c9d4e1be2466f3fd3652a7050c689e0b (diff) | |
download | ydb-05482e640fc81d547fb7607d0e97ab6cdb4233a0.tar.gz |
Do not emit heartbeats to disabled stream KIKIMR-18159
-rw-r--r-- | ydb/core/tx/datashard/alter_cdc_stream_unit.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 15 |
2 files changed, 16 insertions, 0 deletions
diff --git a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp index bf3baa3e554..a7b8f793f32 100644 --- a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp @@ -42,6 +42,7 @@ public: switch (state) { case NKikimrSchemeOp::ECdcStreamStateDisabled: tableInfo = DataShard.AlterTableSwitchCdcStreamState(ctx, txc, pathId, version, streamPathId, state); + DataShard.GetCdcStreamHeartbeatManager().DropCdcStream(txc.DB, pathId, streamPathId); break; case NKikimrSchemeOp::ECdcStreamStateReady: diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 43c221da89a..91b1a52b714 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -2737,6 +2737,7 @@ Y_UNIT_TEST_SUITE(Cdc) { R"({"resolved":"***"})", }); + // split table const auto tabletIds = GetTableShards(server, edgeActor, "/Root/Table"); UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1); @@ -2752,6 +2753,20 @@ Y_UNIT_TEST_SUITE(Cdc) { R"({"resolved":"***"})", R"({"resolved":"***"})", }); + + // disable stream + WaitTxNotification(server, edgeActor, AsyncAlterDisableStream(server, "/Root", "Table", "Stream")); + SimulateSleep(server, TDuration::Seconds(5)); + + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + R"({"update":{"value":30},"key":[3]})", + R"({"resolved":"***"})", + R"({"resolved":"***"})", + R"({"resolved":"***"})", + }); } Y_UNIT_TEST(InitialScanAndResolvedTimestamps) { |