aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-05-30 18:19:27 +0300
committerilnaz <ilnaz@ydb.tech>2023-05-30 18:19:27 +0300
commitaaba39f70f793486278aa4453d51119d3c19d256 (patch)
tree3d29adb67f1f5d07120435950326b06d497bcd8b
parente6aaf084808980d163b4c04ccb77829d54d14550 (diff)
downloadydb-aaba39f70f793486278aa4453d51119d3c19d256.tar.gz
ResolvedTimestamps option in cdc stream description
-rw-r--r--ydb/core/protos/flat_scheme_op.proto2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h11
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h26
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp29
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp34
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp6
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/ls_checks.h1
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema16
11 files changed, 125 insertions, 11 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index fa09828683..3c60c397d7 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -745,6 +745,8 @@ message TCdcStreamDescription {
repeated TUserAttribute UserAttributes = 8;
// AwsRegion used to mark records in DynamoDB-compatible mode (FormatDynamoDBStreamsJson)
optional string AwsRegion = 9;
+ // Set to '0' to disable resolved timestamps
+ optional uint64 ResolvedTimestampsIntervalMs = 10;
}
message TCreateCdcStream {
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index 9fc20106e1..88586f1310 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -2822,6 +2822,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
auto mode = rowset.GetValue<Schema::CdcStream::Mode>();
auto format = rowset.GetValue<Schema::CdcStream::Format>();
auto vt = rowset.GetValueOrDefault<Schema::CdcStream::VirtualTimestamps>(false);
+ auto rt = TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStream::ResolvedTimestampsIntervalMs>(0));
auto awsRegion = rowset.GetValue<Schema::CdcStream::AwsRegion>();
auto state = rowset.GetValue<Schema::CdcStream::State>();
@@ -2833,7 +2834,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
<< ", path type: " << NKikimrSchemeOp::EPathType_Name(path->PathType));
Y_VERIFY(!Self->CdcStreams.contains(pathId));
- Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, vt, awsRegion, state);
+ Self->CdcStreams[pathId] = new TCdcStreamInfo(alterVersion, mode, format, vt, rt, awsRegion, state);
Self->IncrementPathDbRefCount(pathId);
if (state == NKikimrSchemeOp::ECdcStreamStateScan) {
@@ -2866,6 +2867,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
auto mode = rowset.GetValue<Schema::CdcStreamAlterData::Mode>();
auto format = rowset.GetValue<Schema::CdcStreamAlterData::Format>();
auto vt = rowset.GetValueOrDefault<Schema::CdcStreamAlterData::VirtualTimestamps>(false);
+ auto rt = TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::ResolvedTimestampsIntervalMs>(0));
auto awsRegion = rowset.GetValue<Schema::CdcStreamAlterData::AwsRegion>();
auto state = rowset.GetValue<Schema::CdcStreamAlterData::State>();
@@ -2878,14 +2880,14 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
if (!Self->CdcStreams.contains(pathId)) {
Y_VERIFY(alterVersion == 1);
- Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode, format, vt, awsRegion);
+ Self->CdcStreams[pathId] = TCdcStreamInfo::New(mode, format, vt, rt, awsRegion);
Self->IncrementPathDbRefCount(pathId);
}
auto stream = Self->CdcStreams.at(pathId);
Y_VERIFY(stream->AlterData == nullptr);
Y_VERIFY(stream->AlterVersion < alterVersion);
- stream->AlterData = new TCdcStreamInfo(alterVersion, mode, format, vt, awsRegion, state);
+ stream->AlterData = new TCdcStreamInfo(alterVersion, mode, format, vt, rt, awsRegion, state);
Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found"
<< ", cdc stream pathId: " << pathId
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index e5f35481e5..7a361f224f 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -1546,6 +1546,7 @@ void TSchemeShard::PersistCdcStream(NIceDb::TNiceDb& db, const TPathId& pathId)
NIceDb::TUpdate<Schema::CdcStream::Mode>(alterData->Mode),
NIceDb::TUpdate<Schema::CdcStream::Format>(alterData->Format),
NIceDb::TUpdate<Schema::CdcStream::VirtualTimestamps>(alterData->VirtualTimestamps),
+ NIceDb::TUpdate<Schema::CdcStream::ResolvedTimestampsIntervalMs>(alterData->ResolvedTimestamps.MilliSeconds()),
NIceDb::TUpdate<Schema::CdcStream::AwsRegion>(alterData->AwsRegion),
NIceDb::TUpdate<Schema::CdcStream::State>(alterData->State)
);
@@ -1571,6 +1572,7 @@ void TSchemeShard::PersistCdcStreamAlterData(NIceDb::TNiceDb& db, const TPathId&
NIceDb::TUpdate<Schema::CdcStreamAlterData::Mode>(alterData->Mode),
NIceDb::TUpdate<Schema::CdcStreamAlterData::Format>(alterData->Format),
NIceDb::TUpdate<Schema::CdcStreamAlterData::VirtualTimestamps>(alterData->VirtualTimestamps),
+ NIceDb::TUpdate<Schema::CdcStreamAlterData::ResolvedTimestampsIntervalMs>(alterData->ResolvedTimestamps.MilliSeconds()),
NIceDb::TUpdate<Schema::CdcStreamAlterData::AwsRegion>(alterData->AwsRegion),
NIceDb::TUpdate<Schema::CdcStreamAlterData::State>(alterData->State)
);
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index bd9ade2fad..1acc8690e7 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -2447,11 +2447,12 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
static constexpr ui32 MaxInProgressShards = 10;
- TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, const TString& awsRegion, EState state)
+ TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, const TDuration& rt, const TString& awsRegion, EState state)
: AlterVersion(version)
, Mode(mode)
, Format(format)
, VirtualTimestamps(vt)
+ , ResolvedTimestamps(rt)
, AwsRegion(awsRegion)
, State(state)
{}
@@ -2466,12 +2467,13 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
return result;
}
- static TPtr New(EMode mode, EFormat format, bool vt, const TString& awsRegion) {
- return new TCdcStreamInfo(0, mode, format, vt, awsRegion, EState::ECdcStreamStateInvalid);
+ static TPtr New(EMode mode, EFormat format, bool vt, const TDuration& rt, const TString& awsRegion) {
+ return new TCdcStreamInfo(0, mode, format, vt, rt, awsRegion, EState::ECdcStreamStateInvalid);
}
static TPtr Create(const NKikimrSchemeOp::TCdcStreamDescription& desc) {
- TPtr result = New(desc.GetMode(), desc.GetFormat(), desc.GetVirtualTimestamps(), desc.GetAwsRegion());
+ TPtr result = New(desc.GetMode(), desc.GetFormat(), desc.GetVirtualTimestamps(),
+ TDuration::MilliSeconds(desc.GetResolvedTimestampsIntervalMs()), desc.GetAwsRegion());
TPtr alterData = result->CreateNextVersion();
alterData->State = EState::ECdcStreamStateReady;
if (desc.HasState()) {
@@ -2485,6 +2487,7 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
EMode Mode;
EFormat Format;
bool VirtualTimestamps;
+ TDuration ResolvedTimestamps;
TString AwsRegion;
EState State;
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index 9634fae398..8c40125151 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -1181,6 +1181,7 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name,
desc.SetMode(info->Mode);
desc.SetFormat(info->Format);
desc.SetVirtualTimestamps(info->VirtualTimestamps);
+ desc.SetResolvedTimestampsIntervalMs(info->ResolvedTimestamps.MilliSeconds());
desc.SetAwsRegion(info->AwsRegion);
PathIdFromPathId(pathId, desc.MutablePathId());
desc.SetState(info->State);
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index eac72c5fe7..c9d88ec9dc 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -1550,9 +1550,20 @@ struct Schema : NIceDb::Schema {
struct Format : Column<6, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamFormat; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamFormatInvalid; };
struct VirtualTimestamps : Column<7, NScheme::NTypeIds::Bool> {};
struct AwsRegion : Column<8, NScheme::NTypeIds::Utf8> {};
+ struct ResolvedTimestampsIntervalMs : Column<9, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<OwnerPathId, LocalPathId>;
- using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps, AwsRegion>;
+ using TColumns = TableColumns<
+ OwnerPathId,
+ LocalPathId,
+ AlterVersion,
+ State,
+ Mode,
+ Format,
+ VirtualTimestamps,
+ AwsRegion,
+ ResolvedTimestampsIntervalMs
+ >;
};
struct CdcStreamAlterData : Table<96> {
@@ -1564,9 +1575,20 @@ struct Schema : NIceDb::Schema {
struct Format : Column<6, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::ECdcStreamFormat; static constexpr Type Default = NKikimrSchemeOp::ECdcStreamFormatInvalid; };
struct VirtualTimestamps : Column<7, NScheme::NTypeIds::Bool> {};
struct AwsRegion : Column<8, NScheme::NTypeIds::Utf8> {};
+ struct ResolvedTimestampsIntervalMs : Column<9, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<OwnerPathId, LocalPathId>;
- using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, State, Mode, Format, VirtualTimestamps, AwsRegion>;
+ using TColumns = TableColumns<
+ OwnerPathId,
+ LocalPathId,
+ AlterVersion,
+ State,
+ Mode,
+ Format,
+ VirtualTimestamps,
+ AwsRegion,
+ ResolvedTimestampsIntervalMs
+ >;
};
struct CdcStreamScanShardStatus : Table<103> {
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index 7946c6cf4d..3cd979308e 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -103,6 +103,35 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
});
}
+ Y_UNIT_TEST(ResolvedTimestamps) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ ResolvedTimestampsIntervalMs: 1000
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::StreamResolvedTimestamps(TDuration::MilliSeconds(1000)),
+ });
+ }
+
Y_UNIT_TEST(RetentionPeriod) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
index fb10c2298d..e5da46f3d9 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
@@ -105,6 +105,40 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
});
}
+ Y_UNIT_TEST(CreateStreamWithResolvedTimestamps) {
+ TTestWithReboots t;
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ {
+ TInactiveZone inactive(activeZone);
+ TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+ }
+
+ TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ ResolvedTimestampsIntervalMs: 1000
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ {
+ TInactiveZone inactive(activeZone);
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::StreamResolvedTimestamps(TDuration::MilliSeconds(1000)),
+ });
+ }
+ });
+ }
+
Y_UNIT_TEST(DisableStream) {
TTestWithReboots t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
index 4aa4cab021..162d82f206 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
@@ -801,6 +801,12 @@ TCheckFunc StreamVirtualTimestamps(bool value) {
};
}
+TCheckFunc StreamResolvedTimestamps(const TDuration& value) {
+ return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
+ UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetResolvedTimestampsIntervalMs(), value.MilliSeconds());
+ };
+}
+
TCheckFunc StreamAwsRegion(const TString& value) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetAwsRegion(), value);
diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
index 9738370f94..0168972abf 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
@@ -133,6 +133,7 @@ namespace NLs {
TCheckFunc StreamFormat(NKikimrSchemeOp::ECdcStreamFormat format);
TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state);
TCheckFunc StreamVirtualTimestamps(bool value);
+ TCheckFunc StreamResolvedTimestamps(const TDuration& value);
TCheckFunc StreamAwsRegion(const TString& value);
TCheckFunc RetentionPeriod(const TDuration& value);
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
index bc2e446de4..8e408d9573 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
@@ -6190,6 +6190,11 @@
"ColumnId": 8,
"ColumnName": "AwsRegion",
"ColumnType": "Utf8"
+ },
+ {
+ "ColumnId": 9,
+ "ColumnName": "ResolvedTimestampsIntervalMs",
+ "ColumnType": "Uint64"
}
],
"ColumnsDropped": [],
@@ -6203,7 +6208,8 @@
5,
6,
7,
- 8
+ 8,
+ 9
],
"RoomID": 0,
"Codec": 0,
@@ -6268,6 +6274,11 @@
"ColumnId": 8,
"ColumnName": "AwsRegion",
"ColumnType": "Utf8"
+ },
+ {
+ "ColumnId": 9,
+ "ColumnName": "ResolvedTimestampsIntervalMs",
+ "ColumnType": "Uint64"
}
],
"ColumnsDropped": [],
@@ -6281,7 +6292,8 @@
5,
6,
7,
- 8
+ 8,
+ 9
],
"RoomID": 0,
"Codec": 0,