aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-06-06 14:51:41 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-06-06 14:51:41 +0300
commitf3748b98db7af1f1fd7e345bb6e8d452387d3366 (patch)
tree09f04609360efb6dc13bfb7269a04abf2a069ffb
parent94a5b9b2d4bb884f5a964c47268f2821a04a334c (diff)
downloadydb-f3748b98db7af1f1fd7e345bb6e8d452387d3366.tar.gz
22-2: Additional tests KIKIMR-13698
merge from trunk: r9376681, r9381053, r9385261 REVIEW: 2503366 x-ydb-stable-ref: 21ac0cb179e5c675610d7931a5979edbbb12e072
-rw-r--r--ydb/core/tx/datashard/change_collector_cdc_stream.cpp14
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp14
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp25
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp22
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp68
6 files changed, 137 insertions, 12 deletions
diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
index 24d533a52a..688b32b23e 100644
--- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
@@ -62,17 +62,21 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const {
return *CachedNeedToReadKeys;
}
+ bool value = false;
for (const auto& [_, tableInfo] : Self->GetUserTables()) {
for (const auto& [_, streamInfo] : tableInfo->CdcStreams) {
+ if (streamInfo.State == NKikimrSchemeOp::ECdcStreamStateDisabled) {
+ continue;
+ }
+
switch (streamInfo.Mode) {
case NKikimrSchemeOp::ECdcStreamModeKeysOnly:
case NKikimrSchemeOp::ECdcStreamModeUpdate:
- CachedNeedToReadKeys = false;
break;
case NKikimrSchemeOp::ECdcStreamModeNewImage:
case NKikimrSchemeOp::ECdcStreamModeOldImage:
case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages:
- CachedNeedToReadKeys = true;
+ value = true;
break;
default:
Y_FAIL_S("Invalid stream mode: " << static_cast<ui32>(streamInfo.Mode));
@@ -80,7 +84,7 @@ bool TCdcStreamChangeCollector::NeedToReadKeys() const {
}
}
- Y_VERIFY(CachedNeedToReadKeys);
+ CachedNeedToReadKeys = value;
return *CachedNeedToReadKeys;
}
@@ -113,6 +117,10 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
bool read = false;
for (const auto& [pathId, stream] : userTable->CdcStreams) {
+ if (stream.State == NKikimrSchemeOp::ECdcStreamStateDisabled) {
+ continue;
+ }
+
switch (stream.Mode) {
case NKikimrSchemeOp::ECdcStreamModeKeysOnly:
Persist(tableId, pathId, rop, key, keyTags, {});
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 8ae06589eb..9a2d721607 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -1347,6 +1347,20 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST(DisableStream) {
+ auto action = [](TServer::TPtr server) {
+ return AsyncAlterDisableStream(server, "/Root", "Table", "Stream");
+ };
+
+ ShouldDeliverChanges(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), action, {
+ R"(UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);)",
+ }, {
+ R"(UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);)",
+ }, {
+ R"({"update":{"value":10},"key":[1]})",
+ });
+ }
+
// Split/merge
Y_UNIT_TEST(ShouldDeliverChangesOnSplitMerge) {
TTestPqEnv env(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), false);
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index 7c2b78d6a6..bb6d0af58b 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1704,6 +1704,31 @@ ui64 AsyncAlterAddStream(
return ev->Get()->Record.GetTxId();
}
+ui64 AsyncAlterDisableStream(
+ Tests::TServer::TPtr server,
+ const TString& workingDir,
+ const TString& tableName,
+ const TString& streamName)
+{
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
+ request->Record.SetExecTimeoutPeriod(Max<ui64>());
+ auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
+ tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterCdcStream);
+ tx.SetWorkingDir(workingDir);
+ auto &desc = *tx.MutableAlterCdcStream();
+ desc.SetTableName(tableName);
+ desc.SetStreamName(streamName);
+ desc.MutableDisable();
+
+ runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
+ auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
+ return ev->Get()->Record.GetTxId();
+}
+
ui64 AsyncAlterDropStream(
Tests::TServer::TPtr server,
const TString& workingDir,
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 144f75526c..972d0530b8 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -586,6 +586,12 @@ ui64 AsyncAlterAddStream(
const TString& tableName,
const TShardedTableOptions::TCdcStream& streamDesc);
+ui64 AsyncAlterDisableStream(
+ Tests::TServer::TPtr server,
+ const TString& workingDir,
+ const TString& tableName,
+ const TString& streamName);
+
ui64 AsyncAlterDropStream(
Tests::TServer::TPtr server,
const TString& workingDir,
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp
index 8403511636..3e287ba3c1 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_tables.cpp
@@ -97,27 +97,31 @@ TVector<ISubOperationBase::TPtr> CreateConsistentMoveTable(TOperationId nextId,
for (auto& child: srcPath.Base()->GetChildren()) {
auto name = child.first;
- TPath srcIndexPath = srcPath.Child(name);
- if (srcIndexPath.IsDeleted()) {
+ TPath srcChildPath = srcPath.Child(name);
+ if (srcChildPath.IsDeleted()) {
continue;
}
+ if (srcChildPath.IsCdcStream()) {
+ return {CreateReject(nextId, NKikimrScheme::StatusPreconditionFailed, "Cannot move table with cdc streams")};
+ }
+
TPath dstIndexPath = dstPath.Child(name);
- Y_VERIFY(srcIndexPath.Base()->PathId == child.second);
- Y_VERIFY_S(srcIndexPath.Base()->GetChildren().size() == 1,
- srcIndexPath.PathString() << " has children " << srcIndexPath.Base()->GetChildren().size());
+ Y_VERIFY(srcChildPath.Base()->PathId == child.second);
+ Y_VERIFY_S(srcChildPath.Base()->GetChildren().size() == 1,
+ srcChildPath.PathString() << " has children " << srcChildPath.Base()->GetChildren().size());
result.push_back(CreateMoveTableIndex(TOperationId(nextId.GetTxId(),
nextId.GetSubTxId() + result.size()),
- MoveTableIndexTask(srcIndexPath, dstIndexPath)));
+ MoveTableIndexTask(srcChildPath, dstIndexPath)));
- TString srcImplTableName = srcIndexPath.Base()->GetChildren().begin()->first;
- TPath srcImplTable = srcIndexPath.Child(srcImplTableName);
+ TString srcImplTableName = srcChildPath.Base()->GetChildren().begin()->first;
+ TPath srcImplTable = srcChildPath.Child(srcImplTableName);
if (srcImplTable.IsDeleted()) {
continue;
}
- Y_VERIFY(srcImplTable.Base()->PathId == srcIndexPath.Base()->GetChildren().begin()->second);
+ Y_VERIFY(srcImplTable.Base()->PathId == srcChildPath.Base()->GetChildren().begin()->second);
TPath dstImplTable = dstIndexPath.Child(srcImplTableName);
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index aedf7a458e..a678d77ab9 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -397,4 +397,72 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
TestDropPQGroup(runtime, ++txId, "/MyRoot/Table/Stream", "streamImpl", {NKikimrScheme::StatusNameConflict});
}
+ Y_UNIT_TEST(CopyTableShouldNotCopyStream) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {NLs::PathExist});
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream/streamImpl"), {NLs::PathExist});
+
+ TestCopyTable(runtime, ++txId, "/MyRoot", "TableCopy", "/MyRoot/Table");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/TableCopy/Stream"), {NLs::PathNotExist});
+
+ TestConsistentCopyTables(runtime, ++txId, "/", R"(
+ CopyTableDescriptions {
+ SrcPath: "/MyRoot/Table"
+ DstPath: "/MyRoot/TableConsistentCopy"
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/TableCopy/Stream"), {NLs::PathNotExist});
+ }
+
+ Y_UNIT_TEST(MoveTableShouldFail) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestMoveTable(runtime, ++txId, "/MyRoot/Table", "/MyRoot/TableMoved", {NKikimrScheme::StatusPreconditionFailed});
+ }
+
} // TCdcStreamTests