aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2025-05-30 13:01:29 +0300
committerGitHub <noreply@github.com>2025-05-30 13:01:29 +0300
commit28e25ffe9e77d853d7c392b63973348843c7cbbe (patch)
tree3b3b77341738adb40613c65b2695ebc8bd05859e
parenteb36b7e30877ed5668502297cb6ff750d50d75c3 (diff)
downloadydb-28e25ffe9e77d853d7c392b63973348843c7cbbe.tar.gz
Add `SchemaChanges` option to changefeed's description (#19037)
-rw-r--r--ydb/core/protos/flat_scheme_op.proto1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp11
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h8
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp43
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp34
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp6
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/ls_checks.h1
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema16
11 files changed, 123 insertions, 4 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 56db83b2b5a..91808b4083b 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -1025,6 +1025,7 @@ message TCdcStreamDescription {
// Set to '0' to disable resolved timestamps
optional uint64 ResolvedTimestampsIntervalMs = 10;
optional TCdcStreamScanProgress ScanProgress = 11;
+ optional bool SchemaChanges = 12;
}
message TCreateCdcStream {
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index 284f9cf38a5..b362f23645b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -3049,6 +3049,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
.WithFormat(rowset.GetValue<Schema::CdcStream::Format>())
.WithVirtualTimestamps(rowset.GetValueOrDefault<Schema::CdcStream::VirtualTimestamps>(false))
.WithResolvedTimestamps(TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStream::ResolvedTimestampsIntervalMs>(0)))
+ .WithSchemaChanges(rowset.GetValueOrDefault<Schema::CdcStream::SchemaChanges>(false))
.WithAwsRegion(rowset.GetValue<Schema::CdcStream::AwsRegion>())
.WithState(rowset.GetValue<Schema::CdcStream::State>());
@@ -3094,6 +3095,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
.WithFormat(rowset.GetValue<Schema::CdcStreamAlterData::Format>())
.WithVirtualTimestamps(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::VirtualTimestamps>(false))
.WithResolvedTimestamps(TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::ResolvedTimestampsIntervalMs>(0)))
+ .WithSchemaChanges(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::SchemaChanges>(false))
.WithAwsRegion(rowset.GetValue<Schema::CdcStreamAlterData::AwsRegion>())
.WithState(rowset.GetValue<Schema::CdcStreamAlterData::State>());
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 6db5e4e6f55..45df09671a3 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -272,6 +272,17 @@ public:
}
}
+ if (streamDesc.GetSchemaChanges()) {
+ switch (streamDesc.GetFormat()) {
+ case NKikimrSchemeOp::ECdcStreamFormatJson:
+ break;
+ default:
+ result->SetError(NKikimrScheme::StatusInvalidParameter,
+ "SCHEMA_CHANGES incompatible with specified stream format");
+ return result;
+ }
+ }
+
TString errStr;
if (!context.SS->CheckLocks(tablePath.Base()->PathId, Transaction, errStr)) {
result->SetError(NKikimrScheme::StatusMultipleModifications, errStr);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 570f5353ac4..92c566df1f3 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -1834,6 +1834,7 @@ void TSchemeShard::PersistCdcStream(NIceDb::TNiceDb& db, const TPathId& pathId)
NIceDb::TUpdate<Schema::CdcStream::Format>(alterData->Format),
NIceDb::TUpdate<Schema::CdcStream::VirtualTimestamps>(alterData->VirtualTimestamps),
NIceDb::TUpdate<Schema::CdcStream::ResolvedTimestampsIntervalMs>(alterData->ResolvedTimestamps.MilliSeconds()),
+ NIceDb::TUpdate<Schema::CdcStream::SchemaChanges>(alterData->SchemaChanges),
NIceDb::TUpdate<Schema::CdcStream::AwsRegion>(alterData->AwsRegion),
NIceDb::TUpdate<Schema::CdcStream::State>(alterData->State)
);
@@ -1860,6 +1861,7 @@ void TSchemeShard::PersistCdcStreamAlterData(NIceDb::TNiceDb& db, const TPathId&
NIceDb::TUpdate<Schema::CdcStreamAlterData::Format>(alterData->Format),
NIceDb::TUpdate<Schema::CdcStreamAlterData::VirtualTimestamps>(alterData->VirtualTimestamps),
NIceDb::TUpdate<Schema::CdcStreamAlterData::ResolvedTimestampsIntervalMs>(alterData->ResolvedTimestamps.MilliSeconds()),
+ NIceDb::TUpdate<Schema::CdcStreamAlterData::SchemaChanges>(alterData->SchemaChanges),
NIceDb::TUpdate<Schema::CdcStreamAlterData::AwsRegion>(alterData->AwsRegion),
NIceDb::TUpdate<Schema::CdcStreamAlterData::State>(alterData->State)
);
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 547ead69191..376d737c643 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -2507,6 +2507,7 @@ struct TCdcStreamSettings {
OPTION(EFormat, Format);
OPTION(bool, VirtualTimestamps);
OPTION(TDuration, ResolvedTimestamps);
+ OPTION(bool, SchemaChanges);
OPTION(TString, AwsRegion);
OPTION(EState, State);
@@ -2554,6 +2555,7 @@ struct TCdcStreamInfo
.WithFormat(desc.GetFormat())
.WithVirtualTimestamps(desc.GetVirtualTimestamps())
.WithResolvedTimestamps(TDuration::MilliSeconds(desc.GetResolvedTimestampsIntervalMs()))
+ .WithSchemaChanges(desc.GetSchemaChanges())
.WithAwsRegion(desc.GetAwsRegion()));
TPtr alterData = result->CreateNextVersion();
alterData->State = EState::ECdcStreamStateReady;
@@ -2570,6 +2572,7 @@ struct TCdcStreamInfo
desc.SetFormat(Format);
desc.SetVirtualTimestamps(VirtualTimestamps);
desc.SetResolvedTimestampsIntervalMs(ResolvedTimestamps.MilliSeconds());
+ desc.SetSchemaChanges(SchemaChanges);
desc.SetAwsRegion(AwsRegion);
desc.SetState(State);
if (ScanShards) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index 7025b9fe91c..c920a3de738 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -1729,6 +1729,7 @@ struct Schema : NIceDb::Schema {
struct VirtualTimestamps : Column<7, NScheme::NTypeIds::Bool> {};
struct AwsRegion : Column<8, NScheme::NTypeIds::Utf8> {};
struct ResolvedTimestampsIntervalMs : Column<9, NScheme::NTypeIds::Uint64> {};
+ struct SchemaChanges: Column<10, NScheme::NTypeIds::Bool> {};
using TKey = TableKey<OwnerPathId, LocalPathId>;
using TColumns = TableColumns<
@@ -1740,7 +1741,8 @@ struct Schema : NIceDb::Schema {
Format,
VirtualTimestamps,
AwsRegion,
- ResolvedTimestampsIntervalMs
+ ResolvedTimestampsIntervalMs,
+ SchemaChanges
>;
};
@@ -1754,6 +1756,7 @@ struct Schema : NIceDb::Schema {
struct VirtualTimestamps : Column<7, NScheme::NTypeIds::Bool> {};
struct AwsRegion : Column<8, NScheme::NTypeIds::Utf8> {};
struct ResolvedTimestampsIntervalMs : Column<9, NScheme::NTypeIds::Uint64> {};
+ struct SchemaChanges: Column<10, NScheme::NTypeIds::Bool> {};
using TKey = TableKey<OwnerPathId, LocalPathId>;
using TColumns = TableColumns<
@@ -1765,7 +1768,8 @@ struct Schema : NIceDb::Schema {
Format,
VirtualTimestamps,
AwsRegion,
- ResolvedTimestampsIntervalMs
+ ResolvedTimestampsIntervalMs,
+ SchemaChanges
>;
};
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
index 14b7c17cef1..1fbedaf6a9c 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
@@ -164,6 +164,49 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
}
+ Y_UNIT_TEST(SchemaChanges) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ for (const char* format : TVector<const char*>{"Json"}) {
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream%s"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormat%s
+ SchemaChanges: true
+ }
+ )", format, format));
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, Sprintf("/MyRoot/Table/Stream%s", format)), {
+ NLs::StreamSchemaChanges(true),
+ });
+ }
+
+ for (const char* format : TVector<const char*>{"Proto", "DynamoDBStreamsJson", "DebeziumJson"}) {
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream%s"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormat%s
+ SchemaChanges: true
+ }
+ )", format, format), {NKikimrScheme::StatusInvalidParameter});
+ }
+ }
+
Y_UNIT_TEST(RetentionPeriod) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
index 19f1f52d318..36b4770451f 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp
@@ -177,6 +177,40 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
});
}
+ Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithSchemaChanges) {
+ T t;
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ {
+ TInactiveZone inactive(activeZone);
+ TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+ }
+
+ TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatJson
+ SchemaChanges: true
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ {
+ TInactiveZone inactive(activeZone);
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::StreamSchemaChanges(true),
+ });
+ }
+ });
+ }
+
Y_UNIT_TEST_WITH_REBOOTS(DisableStream) {
T t;
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
index 71488d79dab..82a883c69c3 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
@@ -954,6 +954,12 @@ TCheckFunc StreamResolvedTimestamps(const TDuration& value) {
};
}
+TCheckFunc StreamSchemaChanges(bool value) {
+ return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
+ UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetSchemaChanges(), value);
+ };
+}
+
TCheckFunc StreamAwsRegion(const TString& value) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetAwsRegion(), value);
diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
index a7dbd864400..416ba5cb55a 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
@@ -184,6 +184,7 @@ namespace NLs {
TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state);
TCheckFunc StreamVirtualTimestamps(bool value);
TCheckFunc StreamResolvedTimestamps(const TDuration& value);
+ TCheckFunc StreamSchemaChanges(bool value);
TCheckFunc StreamAwsRegion(const TString& value);
TCheckFunc StreamInitialScanProgress(ui32 total, ui32 completed);
TCheckFunc RetentionPeriod(const TDuration& value);
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
index 0ac3fcc34d4..f909c7a8d51 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
@@ -6894,6 +6894,11 @@
"ColumnId": 9,
"ColumnName": "ResolvedTimestampsIntervalMs",
"ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 10,
+ "ColumnName": "SchemaChanges",
+ "ColumnType": "Bool"
}
],
"ColumnsDropped": [],
@@ -6908,7 +6913,8 @@
6,
7,
8,
- 9
+ 9,
+ 10
],
"RoomID": 0,
"Codec": 0,
@@ -6981,6 +6987,11 @@
"ColumnId": 9,
"ColumnName": "ResolvedTimestampsIntervalMs",
"ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 10,
+ "ColumnName": "SchemaChanges",
+ "ColumnType": "Bool"
}
],
"ColumnsDropped": [],
@@ -6995,7 +7006,8 @@
6,
7,
8,
- 9
+ 9,
+ 10
],
"RoomID": 0,
"Codec": 0,