diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-02-24 13:00:10 +0300 |
---|---|---|
committer | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-02-24 13:00:10 +0300 |
commit | 28f4ebc21f1f4c7d2e92741887e1f508489c203b (patch) | |
tree | d41cfc9c86e2b6f7a05959230fda0cfc9aae7a3d | |
parent | 0cd396f47ddebe98614ce96b7b0394a0a892b7cc (diff) | |
download | ydb-28f4ebc21f1f4c7d2e92741887e1f508489c203b.tar.gz |
Different stream formats in schema KIKIMR-14198
ref:d4333c42d2f73e80f858c71f5e0f465715bc3376
15 files changed, 141 insertions, 15 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 33e598c1c2..385b647c5e 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -710,9 +710,16 @@ enum ECdcStreamMode { ECdcStreamModeNewAndOldImages = 5; } +enum ECdcStreamFormat { + ECdcStreamFormatInvalid = 0; + ECdcStreamFormatProto = 1; + ECdcStreamFormatJson = 2; +} + message TCdcStreamDescription { optional string Name = 1; optional ECdcStreamMode Mode = 2; + optional ECdcStreamFormat Format = 6; optional NKikimrProto.TPathID PathId = 3; optional ECdcStreamState State = 4; optional uint64 SchemaVersion = 5; diff --git a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp index 5f3f8524b9..fc406f6f42 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp @@ -669,19 +669,25 @@ Y_UNIT_TEST_SUITE(CdcStreamChangeCollector) { TCdcStream KeysOnly() { return TCdcStream{ - "keys_stream", NKikimrSchemeOp::ECdcStreamModeKeysOnly + .Name = "keys_stream", + .Mode = NKikimrSchemeOp::ECdcStreamModeKeysOnly, + .Format = NKikimrSchemeOp::ECdcStreamFormatProto, }; } TCdcStream Updates() { return TCdcStream{ - "updates_stream", NKikimrSchemeOp::ECdcStreamModeUpdate + .Name = "updates_stream", + .Mode = NKikimrSchemeOp::ECdcStreamModeUpdate, + .Format = NKikimrSchemeOp::ECdcStreamFormatProto, }; } TCdcStream NewAndOldImages() { return TCdcStream{ - "new_and_old_images", NKikimrSchemeOp::ECdcStreamModeNewAndOldImages + .Name = "new_and_old_images", + .Mode = NKikimrSchemeOp::ECdcStreamModeNewAndOldImages, + .Format = NKikimrSchemeOp::ECdcStreamFormatProto, }; } diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index e40e8eee88..ba8d92e6e2 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1673,6 +1673,7 @@ ui64 AsyncAlterAddStream( desc.SetTableName(tableName); desc.MutableStreamDescription()->SetName(streamDesc.Name); desc.MutableStreamDescription()->SetMode(streamDesc.Mode); + desc.MutableStreamDescription()->SetFormat(streamDesc.Format); runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 99ec4bc979..0bf3c88942 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -394,9 +394,11 @@ struct TShardedTableOptions { struct TCdcStream { using EMode = NKikimrSchemeOp::ECdcStreamMode; + using EFormat = NKikimrSchemeOp::ECdcStreamFormat; TString Name; EMode Mode; + EFormat Format; }; #define TABLE_OPTION_IMPL(type, name, defaultValue) \ diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 1560794a49..431ad31be3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -2734,6 +2734,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { ); auto alterVersion = rowset.GetValue<Schema::CdcStream::AlterVersion>(); auto mode = rowset.GetValue<Schema::CdcStream::Mode>(); + auto format = rowset.GetValue<Schema::CdcStream::Format>(); auto state = rowset.GetValue<Schema::CdcStream::State>(); Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId); @@ -2744,7 +2745,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { << ", path type: " << NKikimrSchemeOp::EPathType_Name(path->PathType)); Y_VERIFY(!Self->CdcStreams.contains(pathId)); - Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, state); + Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, state); Self->IncrementPathDbRefCount(pathId); if (!rowset.Next()) { @@ -2768,6 +2769,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { auto alterVersion = rowset.GetValue<Schema::CdcStreamAlterData::AlterVersion>(); auto mode = rowset.GetValue<Schema::CdcStreamAlterData::Mode>(); + auto format = rowset.GetValue<Schema::CdcStreamAlterData::Format>(); auto state = rowset.GetValue<Schema::CdcStreamAlterData::State>(); Y_VERIFY_S(Self->PathsById.contains(pathId), "Path doesn't exist, pathId: " << pathId); @@ -2779,14 +2781,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { if (!Self->CdcStreams.contains(pathId)) { Y_VERIFY(alterVersion == 1); - Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode); + Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode, format); Self->IncrementPathDbRefCount(pathId); } auto stream = Self->CdcStreams.at(pathId); Y_VERIFY(stream->AlterData == nullptr); Y_VERIFY(stream->AlterVersion < alterVersion); - stream->AlterData = new TCdcStreamInfo(alterVersion, mode, state); + stream->AlterData = new TCdcStreamInfo(alterVersion, mode, format, state); Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found" << ", cdc stream pathId: " << pathId diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 01e4cd9df8..e8a91bf4bd 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -209,6 +209,16 @@ public: return result; } + switch (streamDesc.GetFormat()) { + case NKikimrSchemeOp::ECdcStreamFormatProto: + case NKikimrSchemeOp::ECdcStreamFormatJson: + break; + default: + result->SetError(NKikimrScheme::StatusInvalidParameter, TStringBuilder() + << "Invalid stream format: " << static_cast<ui32>(streamDesc.GetFormat())); + return result; + } + TString errStr; if (!context.SS->CheckLocks(tablePath.Base()->PathId, Transaction, errStr)) { result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 640c0b0775..d88e53c376 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1410,6 +1410,7 @@ void TSchemeShard::PersistCdcStream(NIceDb::TNiceDb& db, const TPathId& pathId) db.Table<Schema::CdcStream>().Key(pathId.OwnerId, pathId.LocalPathId).Update( NIceDb::TUpdate<Schema::CdcStream::AlterVersion>(alterData->AlterVersion), NIceDb::TUpdate<Schema::CdcStream::Mode>(alterData->Mode), + NIceDb::TUpdate<Schema::CdcStream::Format>(alterData->Format), NIceDb::TUpdate<Schema::CdcStream::State>(alterData->State) ); @@ -1432,6 +1433,7 @@ void TSchemeShard::PersistCdcStreamAlterData(NIceDb::TNiceDb& db, const TPathId& db.Table<Schema::CdcStreamAlterData>().Key(pathId.OwnerId, pathId.LocalPathId).Update( NIceDb::TUpdate<Schema::CdcStreamAlterData::AlterVersion>(alterData->AlterVersion), NIceDb::TUpdate<Schema::CdcStreamAlterData::Mode>(alterData->Mode), + NIceDb::TUpdate<Schema::CdcStreamAlterData::Format>(alterData->Format), NIceDb::TUpdate<Schema::CdcStreamAlterData::State>(alterData->State) ); } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index fd0bfd5183..591e6ceb3c 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2103,11 +2103,13 @@ struct TTableIndexInfo : public TSimpleRefCount<TTableIndexInfo> { struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { using TPtr = TIntrusivePtr<TCdcStreamInfo>; using EMode = NKikimrSchemeOp::ECdcStreamMode; + using EFormat = NKikimrSchemeOp::ECdcStreamFormat; using EState = NKikimrSchemeOp::ECdcStreamState; - TCdcStreamInfo(ui64 version, EMode mode, EState state) + TCdcStreamInfo(ui64 version, EMode mode, EFormat format, EState state) : AlterVersion(version) , Mode(mode) + , Format(format) , State(state) {} @@ -2121,12 +2123,12 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { return result; } - static TPtr New(EMode mode) { - return new TCdcStreamInfo(0, mode, EState::ECdcStreamStateInvalid); + static TPtr New(EMode mode, EFormat format) { + return new TCdcStreamInfo(0, mode, format, EState::ECdcStreamStateInvalid); } static TPtr Create(const NKikimrSchemeOp::TCdcStreamDescription& desc) { - TPtr result = New(desc.GetMode()); + TPtr result = New(desc.GetMode(), desc.GetFormat()); TPtr alterData = result->CreateNextVersion(); alterData->State = EState::ECdcStreamStateReady; @@ -2135,6 +2137,7 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { ui64 AlterVersion = 1; EMode Mode; + EFormat Format; EState State; TCdcStreamInfo::TPtr AlterData = nullptr; diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 3789d3af1b..4b178640bf 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -1027,6 +1027,7 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name, desc.SetName(name); desc.SetMode(info->Mode); + desc.SetFormat(info->Format); PathIdFromPathId(pathId, desc.MutablePathId()); desc.SetState(info->State); desc.SetSchemaVersion(info->AlterVersion); diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index cc19895d88..2a4e3bc598 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1537,9 +1537,10 @@ struct Schema : NIceDb::Schema { struct AlterVersion : Column<3, NScheme::NTypeIds::Uint64> {}; struct State : Column<4, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamState; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamStateInvalid; }; struct Mode : Column<5, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamMode; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamModeInvalid; }; + struct Format : Column<6, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamFormat; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamFormatInvalid; }; using TKey = TableKey<OwnerPathId, LocalPathId>; - using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode>; + using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format>; }; struct CdcStreamAlterData : Table<96> { @@ -1548,9 +1549,10 @@ struct Schema : NIceDb::Schema { struct AlterVersion : Column<3, NScheme::NTypeIds::Uint64> {}; struct State : Column<4, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamState; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamStateInvalid; }; struct Mode : Column<5, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamMode; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamModeInvalid; }; + struct Format : Column<6, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamFormat; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamFormatInvalid; }; using TKey = TableKey<OwnerPathId, LocalPathId>; - using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode>; + using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format>; }; struct Sequences : Table<97> { diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index 5090fe7df6..7bdbf5327f 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -23,11 +23,17 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )"); env.TestWaitNotification(runtime, txId); - TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {NLs::PathExist}); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathExist, + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly), + NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), + NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), + }); TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream/streamImpl"), {NLs::PathExist}); TestAlterCdcStream(runtime, ++txId, "/MyRoot", R"( @@ -37,6 +43,13 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { )"); env.TestWaitNotification(runtime, txId); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathExist, + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly), + NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), + NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled), + }); + TestDropCdcStream(runtime, ++txId, "/MyRoot", R"( TableName: "Table" StreamName: "Stream" @@ -57,6 +70,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )", {NKikimrScheme::StatusNameConflict}); @@ -65,6 +79,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )", {NKikimrScheme::StatusPathDoesNotExist}); @@ -87,6 +102,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamDescription { Name: "Index" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )", {NKikimrScheme::StatusNameConflict}); @@ -95,6 +111,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )", {NKikimrScheme::StatusNameConflict}); @@ -110,6 +127,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )"); env.TestWaitNotification(runtime, txId); @@ -122,6 +140,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )", {NKikimrScheme::StatusPathDoesNotExist}); } @@ -168,6 +187,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )"); env.TestWaitNotification(runtime, txId); @@ -211,6 +231,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )"); env.TestWaitNotification(runtime, txId); @@ -242,6 +263,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )"); env.TestWaitNotification(runtime, txId); @@ -267,6 +289,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )"); env.TestWaitNotification(runtime, txId); @@ -297,6 +320,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )"); env.TestWaitNotification(runtime, txId); @@ -325,6 +349,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )"); env.TestWaitNotification(runtime, txId); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp index 376544ace8..c512c74f30 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp @@ -22,6 +22,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); @@ -48,9 +49,17 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathExist, + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly), + NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), + NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateReady), + }); } auto request = AlterCdcStreamRequest(++t.TxId, "/MyRoot", R"( @@ -63,6 +72,13 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { NKikimrScheme::StatusMultipleModifications, }); t.TestEnv->TestWaitNotification(runtime, t.TxId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::PathExist, + NLs::StreamMode(NKikimrSchemeOp::ECdcStreamModeKeysOnly), + NLs::StreamFormat(NKikimrSchemeOp::ECdcStreamFormatProto), + NLs::StreamState(NKikimrSchemeOp::ECdcStreamStateDisabled), + }); }); } @@ -84,6 +100,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); @@ -119,6 +136,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )"); t.TestEnv->ReliablePropose(runtime, request, { @@ -149,6 +167,7 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { StreamDescription { Name: "Stream" Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto } )"); t.TestEnv->ReliablePropose(runtime, request, { diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index c5d6c5a973..4560c2b96f 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -681,6 +681,24 @@ TCheckFunc IndexDataColumns(const TVector<TString>& dataColumnNames) { }; } +TCheckFunc StreamMode(NKikimrSchemeOp::ECdcStreamMode mode) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetMode(), mode); + }; +} + +TCheckFunc StreamFormat(NKikimrSchemeOp::ECdcStreamFormat format) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetFormat(), format); + }; +} + +TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetState(), state); + }; +} + void NoChildren(const NKikimrScheme::TEvDescribeSchemeResult& record) { ChildrenCount(0)(record); } @@ -1010,3 +1028,15 @@ Y_DECLARE_OUT_SPEC(inline, NKikimrSchemeOp::EIndexType, o, x) { Y_DECLARE_OUT_SPEC(inline, NKikimrSchemeOp::EIndexState, o, x) { o << NKikimrSchemeOp::EIndexState_Name(x); } + +Y_DECLARE_OUT_SPEC(inline, NKikimrSchemeOp::ECdcStreamMode, o, x) { + o << NKikimrSchemeOp::ECdcStreamMode_Name(x); +} + +Y_DECLARE_OUT_SPEC(inline, NKikimrSchemeOp::ECdcStreamFormat, o, x) { + o << NKikimrSchemeOp::ECdcStreamFormat_Name(x); +} + +Y_DECLARE_OUT_SPEC(inline, NKikimrSchemeOp::ECdcStreamState, o, x) { + o << NKikimrSchemeOp::ECdcStreamState_Name(x); +} diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h index 33022e4257..60f3960bae 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h @@ -117,6 +117,10 @@ namespace NLs { TCheckFunc IndexKeys(const TVector<TString>& keyNames); TCheckFunc IndexDataColumns(const TVector<TString>& dataColumnNames); + TCheckFunc StreamMode(NKikimrSchemeOp::ECdcStreamMode mode); + TCheckFunc StreamFormat(NKikimrSchemeOp::ECdcStreamFormat format); + TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state); + TCheckFunc HasBackupInFly(ui64 txId); void NoBackupInFly(const NKikimrScheme::TEvDescribeSchemeResult& record); TCheckFunc BackupHistoryCount(ui64 count); diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index cb12a5c1f7..0e494ef0bd 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -6085,6 +6085,11 @@ "ColumnId": 5, "ColumnName": "Mode", "ColumnType": "Uint32" + }, + { + "ColumnId": 6, + "ColumnName": "Format", + "ColumnType": "Uint32" } ], "ColumnsDropped": [], @@ -6095,7 +6100,8 @@ 2, 3, 4, - 5 + 5, + 6 ], "RoomID": 0, "Codec": 0, @@ -6145,6 +6151,11 @@ "ColumnId": 5, "ColumnName": "Mode", "ColumnType": "Uint32" + }, + { + "ColumnId": 6, + "ColumnName": "Format", + "ColumnType": "Uint32" } ], "ColumnsDropped": [], @@ -6155,7 +6166,8 @@ 2, 3, 4, - 5 + 5, + 6 ], "RoomID": 0, "Codec": 0, |