diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-05-30 18:19:27 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-05-30 18:19:27 +0300 |
commit | aaba39f70f793486278aa4453d51119d3c19d256 (patch) | |
tree | 3d29adb67f1f5d07120435950326b06d497bcd8b | |
parent | e6aaf084808980d163b4c04ccb77829d54d14550 (diff) | |
download | ydb-aaba39f70f793486278aa4453d51119d3c19d256.tar.gz |
ResolvedTimestamps option in cdc stream description
11 files changed, 125 insertions, 11 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index fa09828683..3c60c397d7 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -745,6 +745,8 @@ message TCdcStreamDescription { repeated TUserAttribute UserAttributes = 8; // AwsRegion used to mark records in DynamoDB-compatible mode (FormatDynamoDBStreamsJson) optional string AwsRegion = 9; + // Set to '0' to disable resolved timestamps + optional uint64 ResolvedTimestampsIntervalMs = 10; } message TCreateCdcStream { diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 9fc20106e1..88586f1310 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -2822,6 +2822,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { auto mode = rowset.GetValue<Schema::CdcStream::Mode>(); auto format = rowset.GetValue<Schema::CdcStream::Format>(); auto vt = rowset.GetValueOrDefault<Schema::CdcStream::VirtualTimestamps>(false); + auto rt = TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStream::ResolvedTimestampsIntervalMs>(0)); auto awsRegion = rowset.GetValue<Schema::CdcStream::AwsRegion>(); auto state = rowset.GetValue<Schema::CdcStream::State>(); @@ -2833,7 +2834,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { << ", path type: " << NKikimrSchemeOp::EPathType_Name(path->PathType)); Y_VERIFY(!Self->CdcStreams.contains(pathId)); - Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, vt, awsRegion, state); + Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, vt, rt, awsRegion, state); Self->IncrementPathDbRefCount(pathId); if (state == NKikimrSchemeOp::ECdcStreamStateScan) { @@ -2866,6 +2867,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { auto mode = rowset.GetValue<Schema::CdcStreamAlterData::Mode>(); auto format = rowset.GetValue<Schema::CdcStreamAlterData::Format>(); auto vt = rowset.GetValueOrDefault<Schema::CdcStreamAlterData::VirtualTimestamps>(false); + auto rt = TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::ResolvedTimestampsIntervalMs>(0)); auto awsRegion = rowset.GetValue<Schema::CdcStreamAlterData::AwsRegion>(); auto state = rowset.GetValue<Schema::CdcStreamAlterData::State>(); @@ -2878,14 +2880,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { if (!Self->CdcStreams.contains(pathId)) { Y_VERIFY(alterVersion == 1); - Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode, format, vt, awsRegion); + Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode, format, vt, rt, awsRegion); Self->IncrementPathDbRefCount(pathId); } auto stream = Self->CdcStreams.at(pathId); Y_VERIFY(stream->AlterData == nullptr); Y_VERIFY(stream->AlterVersion < alterVersion); - stream->AlterData = new TCdcStreamInfo(alterVersion, mode, format, vt, awsRegion, state); + stream->AlterData = new TCdcStreamInfo(alterVersion, mode, format, vt, rt, awsRegion, state); Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found" << ", cdc stream pathId: " << pathId diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index e5f35481e5..7a361f224f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1546,6 +1546,7 @@ void TSchemeShard::PersistCdcStream(NIceDb::TNiceDb& db, const TPathId& pathId) NIceDb::TUpdate<Schema::CdcStream::Mode>(alterData->Mode), NIceDb::TUpdate<Schema::CdcStream::Format>(alterData->Format), NIceDb::TUpdate<Schema::CdcStream::VirtualTimestamps>(alterData->VirtualTimestamps), + NIceDb::TUpdate<Schema::CdcStream::ResolvedTimestampsIntervalMs>(alterData->ResolvedTimestamps.MilliSeconds()), NIceDb::TUpdate<Schema::CdcStream::AwsRegion>(alterData->AwsRegion), NIceDb::TUpdate<Schema::CdcStream::State>(alterData->State) ); @@ -1571,6 +1572,7 @@ void TSchemeShard::PersistCdcStreamAlterData(NIceDb::TNiceDb& db, const TPathId& NIceDb::TUpdate<Schema::CdcStreamAlterData::Mode>(alterData->Mode), NIceDb::TUpdate<Schema::CdcStreamAlterData::Format>(alterData->Format), NIceDb::TUpdate<Schema::CdcStreamAlterData::VirtualTimestamps>(alterData->VirtualTimestamps), + NIceDb::TUpdate<Schema::CdcStreamAlterData::ResolvedTimestampsIntervalMs>(alterData->ResolvedTimestamps.MilliSeconds()), NIceDb::TUpdate<Schema::CdcStreamAlterData::AwsRegion>(alterData->AwsRegion), NIceDb::TUpdate<Schema::CdcStreamAlterData::State>(alterData->State) ); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index bd9ade2fad..1acc8690e7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2447,11 +2447,12 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { static constexpr ui32 MaxInProgressShards = 10; - TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, const TString& awsRegion, EState state) + TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, const TDuration& rt, const TString& awsRegion, EState state) : AlterVersion(version) , Mode(mode) , Format(format) , VirtualTimestamps(vt) + , ResolvedTimestamps(rt) , AwsRegion(awsRegion) , State(state) {} @@ -2466,12 +2467,13 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { return result; } - static TPtr New(EMode mode, EFormat format, bool vt, const TString& awsRegion) { - return new TCdcStreamInfo(0, mode, format, vt, awsRegion, EState::ECdcStreamStateInvalid); + static TPtr New(EMode mode, EFormat format, bool vt, const TDuration& rt, const TString& awsRegion) { + return new TCdcStreamInfo(0, mode, format, vt, rt, awsRegion, EState::ECdcStreamStateInvalid); } static TPtr Create(const NKikimrSchemeOp::TCdcStreamDescription& desc) { - TPtr result = New(desc.GetMode(), desc.GetFormat(), desc.GetVirtualTimestamps(), desc.GetAwsRegion()); + TPtr result = New(desc.GetMode(), desc.GetFormat(), desc.GetVirtualTimestamps(), + TDuration::MilliSeconds(desc.GetResolvedTimestampsIntervalMs()), desc.GetAwsRegion()); TPtr alterData = result->CreateNextVersion(); alterData->State = EState::ECdcStreamStateReady; if (desc.HasState()) { @@ -2485,6 +2487,7 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> { EMode Mode; EFormat Format; bool VirtualTimestamps; + TDuration ResolvedTimestamps; TString AwsRegion; EState State; diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index 9634fae398..8c40125151 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -1181,6 +1181,7 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name, desc.SetMode(info->Mode); desc.SetFormat(info->Format); desc.SetVirtualTimestamps(info->VirtualTimestamps); + desc.SetResolvedTimestampsIntervalMs(info->ResolvedTimestamps.MilliSeconds()); desc.SetAwsRegion(info->AwsRegion); PathIdFromPathId(pathId, desc.MutablePathId()); desc.SetState(info->State); diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index eac72c5fe7..c9d88ec9dc 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1550,9 +1550,20 @@ struct Schema : NIceDb::Schema { struct Format : Column<6, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamFormat; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamFormatInvalid; }; struct VirtualTimestamps : Column<7, NScheme::NTypeIds::Bool> {}; struct AwsRegion : Column<8, NScheme::NTypeIds::Utf8> {}; + struct ResolvedTimestampsIntervalMs : Column<9, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<OwnerPathId, LocalPathId>; - using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps, AwsRegion>; + using TColumns = TableColumns< + OwnerPathId, + LocalPathId, + AlterVersion, + State, + Mode, + Format, + VirtualTimestamps, + AwsRegion, + ResolvedTimestampsIntervalMs + >; }; struct CdcStreamAlterData : Table<96> { @@ -1564,9 +1575,20 @@ struct Schema : NIceDb::Schema { struct Format : Column<6, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamFormat; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamFormatInvalid; }; struct VirtualTimestamps : Column<7, NScheme::NTypeIds::Bool> {}; struct AwsRegion : Column<8, NScheme::NTypeIds::Utf8> {}; + struct ResolvedTimestampsIntervalMs : Column<9, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<OwnerPathId, LocalPathId>; - using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps, AwsRegion>; + using TColumns = TableColumns< + OwnerPathId, + LocalPathId, + AlterVersion, + State, + Mode, + Format, + VirtualTimestamps, + AwsRegion, + ResolvedTimestampsIntervalMs + >; }; struct CdcStreamScanShardStatus : Table<103> { diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index 7946c6cf4d..3cd979308e 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -103,6 +103,35 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { }); } + Y_UNIT_TEST(ResolvedTimestamps) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + ResolvedTimestampsIntervalMs: 1000 + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::StreamResolvedTimestamps(TDuration::MilliSeconds(1000)), + }); + } + Y_UNIT_TEST(RetentionPeriod) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp index fb10c2298d..e5da46f3d9 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp @@ -105,6 +105,40 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } + Y_UNIT_TEST(CreateStreamWithResolvedTimestamps) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + ResolvedTimestampsIntervalMs: 1000 + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::StreamResolvedTimestamps(TDuration::MilliSeconds(1000)), + }); + } + }); + } + Y_UNIT_TEST(DisableStream) { TTestWithReboots t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index 4aa4cab021..162d82f206 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -801,6 +801,12 @@ TCheckFunc StreamVirtualTimestamps(bool value) { }; } +TCheckFunc StreamResolvedTimestamps(const TDuration& value) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetResolvedTimestampsIntervalMs(), value.MilliSeconds()); + }; +} + TCheckFunc StreamAwsRegion(const TString& value) { return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetAwsRegion(), value); diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h index 9738370f94..0168972abf 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h @@ -133,6 +133,7 @@ namespace NLs { TCheckFunc StreamFormat(NKikimrSchemeOp::ECdcStreamFormat format); TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state); TCheckFunc StreamVirtualTimestamps(bool value); + TCheckFunc StreamResolvedTimestamps(const TDuration& value); TCheckFunc StreamAwsRegion(const TString& value); TCheckFunc RetentionPeriod(const TDuration& value); diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index bc2e446de4..8e408d9573 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -6190,6 +6190,11 @@ "ColumnId": 8, "ColumnName": "AwsRegion", "ColumnType": "Utf8" + }, + { + "ColumnId": 9, + "ColumnName": "ResolvedTimestampsIntervalMs", + "ColumnType": "Uint64" } ], "ColumnsDropped": [], @@ -6203,7 +6208,8 @@ 5, 6, 7, - 8 + 8, + 9 ], "RoomID": 0, "Codec": 0, @@ -6268,6 +6274,11 @@ "ColumnId": 8, "ColumnName": "AwsRegion", "ColumnType": "Utf8" + }, + { + "ColumnId": 9, + "ColumnName": "ResolvedTimestampsIntervalMs", + "ColumnType": "Uint64" } ], "ColumnsDropped": [], @@ -6281,7 +6292,8 @@ 5, 6, 7, - 8 + 8, + 9 ], "RoomID": 0, "Codec": 0, |