summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <[email protected]>2022-04-22 13:39:11 +0300
committerIlnaz Nizametdinov <[email protected]>2022-04-22 13:39:11 +0300
commit7929fa7dc1e20fd956f4b2ac825285683c4b94ed (patch)
tree2ddc2b21251a9bbe25f6f3e9f6cf36be15c9672e
parenta482a42aab20bf1a3bc9ce75fd16c419d2727aee (diff)
Shoud not generate change records for disabled stream KIKIMR-13698
ref:8152875877ddc45cb11d341bc3ce33a47bbcefe2
-rw-r--r--ydb/core/tx/datashard/change_collector_cdc_stream.cpp14
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp14
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp25
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h6
4 files changed, 56 insertions, 3 deletions
diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
index 24d533a52a3..688b32b23e7 100644
--- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
@@ -62,17 +62,21 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const {
return *CachedNeedToReadKeys;
}
+ bool value = false;
for (const auto& [_, tableInfo] : Self->GetUserTables()) {
for (const auto& [_, streamInfo] : tableInfo->CdcStreams) {
+ if (streamInfo.State == NKikimrSchemeOp::ECdcStreamStateDisabled) {
+ continue;
+ }
+
switch (streamInfo.Mode) {
case NKikimrSchemeOp::ECdcStreamModeKeysOnly:
case NKikimrSchemeOp::ECdcStreamModeUpdate:
- CachedNeedToReadKeys = false;
break;
case NKikimrSchemeOp::ECdcStreamModeNewImage:
case NKikimrSchemeOp::ECdcStreamModeOldImage:
case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages:
- CachedNeedToReadKeys = true;
+ value = true;
break;
default:
Y_FAIL_S("Invalid stream mode: " << static_cast<ui32>(streamInfo.Mode));
@@ -80,7 +84,7 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const {
}
}
- Y_VERIFY(CachedNeedToReadKeys);
+ CachedNeedToReadKeys = value;
return *CachedNeedToReadKeys;
}
@@ -113,6 +117,10 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
bool read = false;
for (const auto& [pathId, stream] : userTable->CdcStreams) {
+ if (stream.State == NKikimrSchemeOp::ECdcStreamStateDisabled) {
+ continue;
+ }
+
switch (stream.Mode) {
case NKikimrSchemeOp::ECdcStreamModeKeysOnly:
Persist(tableId, pathId, rop, key, keyTags, {});
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 8ae06589ebe..9a2d721607d 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -1347,6 +1347,20 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(DisableStream) {
+ auto action = [](TServer::TPtr server) {
+ return AsyncAlterDisableStream(server, "/Root", "Table", "Stream");
+ };
+
+ ShouldDeliverChanges(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, {
+ R"(UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);)",
+ }, {
+ R"(UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);)",
+ }, {
+ R"({"update":{"value":10},"key":[1]})",
+ });
+ }
+
// Split/merge
Y_UNIT_TEST(ShouldDeliverChangesOnSplitMerge) {
TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false);
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index 3d334951406..e83a657ad8a 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1704,6 +1704,31 @@ ui64 AsyncAlterAddStream(
return ev->Get()->Record.GetTxId();
}
+ui64 AsyncAlterDisableStream(
+ Tests::TServer::TPtr server,
+ const TString& workingDir,
+ const TString& tableName,
+ const TString& streamName)
+{
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
+ request->Record.SetExecTimeoutPeriod(Max<ui64>());
+ auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
+ tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterCdcStream);
+ tx.SetWorkingDir(workingDir);
+ auto &desc = *tx.MutableAlterCdcStream();
+ desc.SetTableName(tableName);
+ desc.SetStreamName(streamName);
+ desc.MutableDisable();
+
+ runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
+ auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
+ return ev->Get()->Record.GetTxId();
+}
+
ui64 AsyncAlterDropStream(
Tests::TServer::TPtr server,
const TString& workingDir,
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 144f75526c6..972d0530b83 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -586,6 +586,12 @@ ui64 AsyncAlterAddStream(
const TString& tableName,
const TShardedTableOptions::TCdcStream& streamDesc);
+ui64 AsyncAlterDisableStream(
+ Tests::TServer::TPtr server,
+ const TString& workingDir,
+ const TString& tableName,
+ const TString& streamName);
+
ui64 AsyncAlterDropStream(
Tests::TServer::TPtr server,
const TString& workingDir,