aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-02-24 13:00:10 +0300
committerIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-02-24 13:00:10 +0300
commit28f4ebc21f1f4c7d2e92741887e1f508489c203b (patch)
treed41cfc9c86e2b6f7a05959230fda0cfc9aae7a3d
parent0cd396f47ddebe98614ce96b7b0394a0a892b7cc (diff)
downloadydb-28f4ebc21f1f4c7d2e92741887e1f508489c203b.tar.gz
Different stream formats in schema KIKIMR-14198
ref:d4333c42d2f73e80f858c71f5e0f465715bc3376
-rw-r--r--ydb/core/protos/flat_scheme_op.proto7
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_collector.cpp12
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h11
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h6
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp27
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp19
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp30
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/ls_checks.h4
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema16
15 files changed, 141 insertions, 15 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 33e598c1c2..385b647c5e 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -710,9 +710,16 @@ enum ECdcStreamMode {
ECdcStreamModeNewAndOldImages = 5;
}
+enum ECdcStreamFormat {
+ ECdcStreamFormatInvalid = 0;
+ ECdcStreamFormatProto = 1;
+ ECdcStreamFormatJson = 2;
+}
+
message TCdcStreamDescription {
optional string Name = 1;
optional ECdcStreamMode Mode = 2;
+ optional ECdcStreamFormat Format = 6;
optional NKikimrProto.TPathID PathId = 3;
optional ECdcStreamState State = 4;
optional uint64 SchemaVersion = 5;
diff --git a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
index 5f3f8524b9..fc406f6f42 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
@@ -669,19 +669,25 @@ Y_UNIT_TEST_SUITE(CdcStreamChangeCollector) {
TCdcStream KeysOnly() {
return TCdcStream{
- "keys_stream", NKikimrSchemeOp::ECdcStreamModeKeysOnly
+ .Name = "keys_stream",
+ .Mode = NKikimrSchemeOp::ECdcStreamModeKeysOnly,
+ .Format = NKikimrSchemeOp::ECdcStreamFormatProto,
};
}
TCdcStream Updates() {
return TCdcStream{
- "updates_stream", NKikimrSchemeOp::ECdcStreamModeUpdate
+ .Name = "updates_stream",
+ .Mode = NKikimrSchemeOp::ECdcStreamModeUpdate,
+ .Format = NKikimrSchemeOp::ECdcStreamFormatProto,
};
}
TCdcStream NewAndOldImages() {
return TCdcStream{
- "new_and_old_images", NKikimrSchemeOp::ECdcStreamModeNewAndOldImages
+ .Name = "new_and_old_images",
+ .Mode = NKikimrSchemeOp::ECdcStreamModeNewAndOldImages,
+ .Format = NKikimrSchemeOp::ECdcStreamFormatProto,
};
}
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index e40e8eee88..ba8d92e6e2 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1673,6 +1673,7 @@ ui64 AsyncAlterAddStream(
desc.SetTableName(tableName);
desc.MutableStreamDescription()->SetName(streamDesc.Name);
desc.MutableStreamDescription()->SetMode(streamDesc.Mode);
+ desc.MutableStreamDescription()->SetFormat(streamDesc.Format);
runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 99ec4bc979..0bf3c88942 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -394,9 +394,11 @@ struct TShardedTableOptions {
struct TCdcStream {
using EMode = NKikimrSchemeOp::ECdcStreamMode;
+ using EFormat = NKikimrSchemeOp::ECdcStreamFormat;
TString Name;
EMode Mode;
+ EFormat Format;
};
#define TABLE_OPTION_IMPL(type, name, defaultValue) \
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index 1560794a49..431ad31be3 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -2734,6 +2734,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
);
auto alterVersion = rowset.GetValue<Schema::CdcStream::AlterVersion>();
auto mode = rowset.GetValue<Schema::CdcStream::Mode>();
+ auto format = rowset.GetValue<Schema::CdcStream::Format>();
auto state = rowset.GetValue<Schema::CdcStream::State>();
Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId);
@@ -2744,7 +2745,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
<< ", path type: " << NKikimrSchemeOp::EPathType_Name(path->PathType));
Y_VERIFY(!Self->CdcStreams.contains(pathId));
- Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, state);
+ Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, state);
Self->IncrementPathDbRefCount(pathId);
if (!rowset.Next()) {
@@ -2768,6 +2769,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
auto alterVersion = rowset.GetValue<Schema::CdcStreamAlterData::AlterVersion>();
auto mode = rowset.GetValue<Schema::CdcStreamAlterData::Mode>();
+ auto format = rowset.GetValue<Schema::CdcStreamAlterData::Format>();
auto state = rowset.GetValue<Schema::CdcStreamAlterData::State>();
Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId);
@@ -2779,14 +2781,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
if (!Self->CdcStreams.contains(pathId)) {
Y_VERIFY(alterVersion == 1);
- Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode);
+ Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode, format);
Self->IncrementPathDbRefCount(pathId);
}
auto stream = Self->CdcStreams.at(pathId);
Y_VERIFY(stream->AlterData == nullptr);
Y_VERIFY(stream->AlterVersion < alterVersion);
- stream->AlterData = new TCdcStreamInfo(alterVersion, mode, state);
+ stream->AlterData = new TCdcStreamInfo(alterVersion, mode, format, state);
Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found"
<< ", cdc stream pathId: " << pathId
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 01e4cd9df8..e8a91bf4bd 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -209,6 +209,16 @@ public:
return result;
}
+ switch (streamDesc.GetFormat()) {
+ case NKikimrSchemeOp::ECdcStreamFormatProto:
+ case NKikimrSchemeOp::ECdcStreamFormatJson:
+ break;
+ default:
+ result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder()
+ << "Invalid stream format: " << static_cast<ui32>(streamDesc.GetFormat()));
+ return result;
+ }
+
TString errStr;
if (!context.SS->CheckLocks(tablePath.Base()->PathId, Transaction, errStr)) {
result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 640c0b0775..d88e53c376 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -1410,6 +1410,7 @@ void TSchemeShard::PersistCdcStream(NIceDb::TNiceDb& db, const TPathId& pathId)
db.Table<Schema::CdcStream>().Key(pathId.OwnerId, pathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::CdcStream::AlterVersion>(alterData->AlterVersion),
NIceDb::TUpdate<Schema::CdcStream::Mode>(alterData->Mode),
+ NIceDb::TUpdate<Schema::CdcStream::Format>(alterData->Format),
NIceDb::TUpdate<Schema::CdcStream::State>(alterData->State)
);
@@ -1432,6 +1433,7 @@ void TSchemeShard::PersistCdcStreamAlterData(NIceDb::TNiceDb& db, const TPathId&
db.Table<Schema::CdcStreamAlterData>().Key(pathId.OwnerId, pathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::CdcStreamAlterData::AlterVersion>(alterData->AlterVersion),
NIceDb::TUpdate<Schema::CdcStreamAlterData::Mode>(alterData->Mode),
+ NIceDb::TUpdate<Schema::CdcStreamAlterData::Format>(alterData->Format),
NIceDb::TUpdate<Schema::CdcStreamAlterData::State>(alterData->State)
);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index fd0bfd5183..591e6ceb3c 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -2103,11 +2103,13 @@ struct TTableIndexInfo : public TSimpleRefCount<TTableIndexInfo> {
struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
using TPtr = TIntrusivePtr<TCdcStreamInfo>;
using EMode = NKikimrSchemeOp::ECdcStreamMode;
+ using EFormat = NKikimrSchemeOp::ECdcStreamFormat;
using EState = NKikimrSchemeOp::ECdcStreamState;
- TCdcStreamInfo(ui64 version, EMode mode, EState state)
+ TCdcStreamInfo(ui64 version, EMode mode, EFormat format, EState state)
: AlterVersion(version)
, Mode(mode)
+ , Format(format)
, State(state)
{}
@@ -2121,12 +2123,12 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
return result;
}
- static TPtr New(EMode mode) {
- return new TCdcStreamInfo(0, mode, EState::ECdcStreamStateInvalid);
+ static TPtr New(EMode mode, EFormat format) {
+ return new TCdcStreamInfo(0, mode, format, EState::ECdcStreamStateInvalid);
}
static TPtr Create(const NKikimrSchemeOp::TCdcStreamDescription& desc) {
- TPtr result = New(desc.GetMode());
+ TPtr result = New(desc.GetMode(), desc.GetFormat());
TPtr alterData = result->CreateNextVersion();
alterData->State = EState::ECdcStreamStateReady;
@@ -2135,6 +2137,7 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
ui64 AlterVersion = 1;
EMode Mode;
+ EFormat Format;
EState State;
TCdcStreamInfo::TPtr AlterData = nullptr;
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index 3789d3af1b..4b178640bf 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -1027,6 +1027,7 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name,
desc.SetName(name);
desc.SetMode(info->Mode);
+ desc.SetFormat(info->Format);
PathIdFromPathId(pathId, desc.MutablePathId());
desc.SetState(info->State);
desc.SetSchemaVersion(info->AlterVersion);
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index cc19895d88..2a4e3bc598 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -1537,9 +1537,10 @@ struct Schema : NIceDb::Schema {
struct AlterVersion : Column<3, NScheme::NTypeIds::Uint64> {};
struct State : Column<4, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamState; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamStateInvalid; };
struct Mode : Column<5, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamMode; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamModeInvalid; };
+ struct Format : Column<6, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamFormat; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamFormatInvalid; };
using TKey = TableKey<OwnerPathId, LocalPathId>;
- using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode>;
+ using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format>;
};
struct CdcStreamAlterData : Table<96> {
@@ -1548,9 +1549,10 @@ struct Schema : NIceDb::Schema {
struct AlterVersion : Column<3, NScheme::NTypeIds::Uint64> {};
struct State : Column<4, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamState; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamStateInvalid; };
struct Mode : Column<5, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamMode; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamModeInvalid; };
+ struct Format : Column<6, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamFormat; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamFormatInvalid; };
using TKey = TableKey<OwnerPathId, LocalPathId>;
- using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode>;
+ using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format>;
};
struct Sequences : Table<97> {
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index 5090fe7df6..7bdbf5327f 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -23,11 +23,17 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)");
env.TestWaitNotification(runtime, txId);
- TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {NLs::PathExist});
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::PathExist,
+ NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly),
+ NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
+ NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady),
+ });
TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream/streamImpl"), {NLs::PathExist});
TestAlterCdcStream(runtime, ++txId, "/MyRoot", R"(
@@ -37,6 +43,13 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
)");
env.TestWaitNotification(runtime, txId);
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::PathExist,
+ NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly),
+ NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
+ NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled),
+ });
+
TestDropCdcStream(runtime, ++txId, "/MyRoot", R"(
TableName: "Table"
StreamName: "Stream"
@@ -57,6 +70,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)", {NKikimrScheme::StatusNameConflict});
@@ -65,6 +79,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)", {NKikimrScheme::StatusPathDoesNotExist});
@@ -87,6 +102,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamDescription {
Name: "Index"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)", {NKikimrScheme::StatusNameConflict});
@@ -95,6 +111,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)", {NKikimrScheme::StatusNameConflict});
@@ -110,6 +127,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)");
env.TestWaitNotification(runtime, txId);
@@ -122,6 +140,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)", {NKikimrScheme::StatusPathDoesNotExist});
}
@@ -168,6 +187,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)");
env.TestWaitNotification(runtime, txId);
@@ -211,6 +231,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)");
env.TestWaitNotification(runtime, txId);
@@ -242,6 +263,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)");
env.TestWaitNotification(runtime, txId);
@@ -267,6 +289,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)");
env.TestWaitNotification(runtime, txId);
@@ -297,6 +320,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)");
env.TestWaitNotification(runtime, txId);
@@ -325,6 +349,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)");
env.TestWaitNotification(runtime, txId);
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
index 376544ace8..c512c74f30 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
@@ -22,6 +22,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)");
t.TestEnv->TestWaitNotification(runtime, t.TxId);
@@ -48,9 +49,17 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)");
t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::PathExist,
+ NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly),
+ NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
+ NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady),
+ });
}
auto request = AlterCdcStreamRequest(++t.TxId, "/MyRoot", R"(
@@ -63,6 +72,13 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
NKikimrScheme::StatusMultipleModifications,
});
t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::PathExist,
+ NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly),
+ NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto),
+ NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled),
+ });
});
}
@@ -84,6 +100,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)");
t.TestEnv->TestWaitNotification(runtime, t.TxId);
@@ -119,6 +136,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)");
t.TestEnv->ReliablePropose(runtime, request, {
@@ -149,6 +167,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
}
)");
t.TestEnv->ReliablePropose(runtime, request, {
diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
index c5d6c5a973..4560c2b96f 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
@@ -681,6 +681,24 @@ TCheckFunc IndexDataColumns(const TVector<TString>& dataColumnNames) {
};
}
+TCheckFunc StreamMode(NKikimrSchemeOp::ECdcStreamMode mode) {
+ return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
+ UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetMode(), mode);
+ };
+}
+
+TCheckFunc StreamFormat(NKikimrSchemeOp::ECdcStreamFormat format) {
+ return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
+ UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetFormat(), format);
+ };
+}
+
+TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state) {
+ return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
+ UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetState(), state);
+ };
+}
+
void NoChildren(const NKikimrScheme::TEvDescribeSchemeResult& record) {
ChildrenCount(0)(record);
}
@@ -1010,3 +1028,15 @@ Y_DECLARE_OUT_SPEC(inline, NKikimrSchemeOp::EIndexType, o, x) {
Y_DECLARE_OUT_SPEC(inline, NKikimrSchemeOp::EIndexState, o, x) {
o << NKikimrSchemeOp::EIndexState_Name(x);
}
+
+Y_DECLARE_OUT_SPEC(inline, NKikimrSchemeOp::ECdcStreamMode, o, x) {
+ o << NKikimrSchemeOp::ECdcStreamMode_Name(x);
+}
+
+Y_DECLARE_OUT_SPEC(inline, NKikimrSchemeOp::ECdcStreamFormat, o, x) {
+ o << NKikimrSchemeOp::ECdcStreamFormat_Name(x);
+}
+
+Y_DECLARE_OUT_SPEC(inline, NKikimrSchemeOp::ECdcStreamState, o, x) {
+ o << NKikimrSchemeOp::ECdcStreamState_Name(x);
+}
diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
index 33022e4257..60f3960bae 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
@@ -117,6 +117,10 @@ namespace NLs {
TCheckFunc IndexKeys(const TVector<TString>& keyNames);
TCheckFunc IndexDataColumns(const TVector<TString>& dataColumnNames);
+ TCheckFunc StreamMode(NKikimrSchemeOp::ECdcStreamMode mode);
+ TCheckFunc StreamFormat(NKikimrSchemeOp::ECdcStreamFormat format);
+ TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state);
+
TCheckFunc HasBackupInFly(ui64 txId);
void NoBackupInFly(const NKikimrScheme::TEvDescribeSchemeResult& record);
TCheckFunc BackupHistoryCount(ui64 count);
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
index cb12a5c1f7..0e494ef0bd 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
@@ -6085,6 +6085,11 @@
"ColumnId": 5,
"ColumnName": "Mode",
"ColumnType": "Uint32"
+ },
+ {
+ "ColumnId": 6,
+ "ColumnName": "Format",
+ "ColumnType": "Uint32"
}
],
"ColumnsDropped": [],
@@ -6095,7 +6100,8 @@
2,
3,
4,
- 5
+ 5,
+ 6
],
"RoomID": 0,
"Codec": 0,
@@ -6145,6 +6151,11 @@
"ColumnId": 5,
"ColumnName": "Mode",
"ColumnType": "Uint32"
+ },
+ {
+ "ColumnId": 6,
+ "ColumnName": "Format",
+ "ColumnType": "Uint32"
}
],
"ColumnsDropped": [],
@@ -6155,7 +6166,8 @@
2,
3,
4,
- 5
+ 5,
+ 6
],
"RoomID": 0,
"Codec": 0,