aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-07-17 19:09:25 +0300
committerilnaz <ilnaz@ydb.tech>2023-07-17 19:09:25 +0300
commit05482e640fc81d547fb7607d0e97ab6cdb4233a0 (patch)
treed9e52a95b14899179c6d25f247eeb14ab7ab1cd5
parentf3c0e9b4c9d4e1be2466f3fd3652a7050c689e0b (diff)
downloadydb-05482e640fc81d547fb7607d0e97ab6cdb4233a0.tar.gz
Do not emit heartbeats to disabled stream KIKIMR-18159
-rw-r--r--ydb/core/tx/datashard/alter_cdc_stream_unit.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp15
2 files changed, 16 insertions, 0 deletions
diff --git a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
index bf3baa3e554..a7b8f793f32 100644
--- a/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
@@ -42,6 +42,7 @@ public:
switch (state) {
case NKikimrSchemeOp::ECdcStreamStateDisabled:
tableInfo = DataShard.AlterTableSwitchCdcStreamState(ctx, txc, pathId, version, streamPathId, state);
+ DataShard.GetCdcStreamHeartbeatManager().DropCdcStream(txc.DB, pathId, streamPathId);
break;
case NKikimrSchemeOp::ECdcStreamStateReady:
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 43c221da89a..91b1a52b714 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -2737,6 +2737,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
R"({"resolved":"***"})",
});
+ // split table
const auto tabletIds = GetTableShards(server, edgeActor, "/Root/Table");
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);
@@ -2752,6 +2753,20 @@ Y_UNIT_TEST_SUITE(Cdc) {
R"({"resolved":"***"})",
R"({"resolved":"***"})",
});
+
+ // disable stream
+ WaitTxNotification(server, edgeActor, AsyncAlterDisableStream(server, "/Root", "Table", "Stream"));
+ SimulateSleep(server, TDuration::Seconds(5));
+
+ WaitForContent(server, edgeActor, "/Root/Table/Stream", {
+ R"({"resolved":"***"})",
+ R"({"update":{"value":10},"key":[1]})",
+ R"({"update":{"value":20},"key":[2]})",
+ R"({"update":{"value":30},"key":[3]})",
+ R"({"resolved":"***"})",
+ R"({"resolved":"***"})",
+ R"({"resolved":"***"})",
+ });
}
Y_UNIT_TEST(InitialScanAndResolvedTimestamps) {