diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2025-05-30 13:01:29 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-30 13:01:29 +0300 |
commit | 28e25ffe9e77d853d7c392b63973348843c7cbbe (patch) | |
tree | 3b3b77341738adb40613c65b2695ebc8bd05859e | |
parent | eb36b7e30877ed5668502297cb6ff750d50d75c3 (diff) | |
download | ydb-28e25ffe9e77d853d7c392b63973348843c7cbbe.tar.gz |
Add `SchemaChanges` option to changefeed's description (#19037)
11 files changed, 123 insertions, 4 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 56db83b2b5a..91808b4083b 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1025,6 +1025,7 @@ message TCdcStreamDescription { // Set to '0' to disable resolved timestamps optional uint64 ResolvedTimestampsIntervalMs = 10; optional TCdcStreamScanProgress ScanProgress = 11; + optional bool SchemaChanges = 12; } message TCreateCdcStream { diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 284f9cf38a5..b362f23645b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -3049,6 +3049,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { .WithFormat(rowset.GetValue<Schema::CdcStream::Format>()) .WithVirtualTimestamps(rowset.GetValueOrDefault<Schema::CdcStream::VirtualTimestamps>(false)) .WithResolvedTimestamps(TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStream::ResolvedTimestampsIntervalMs>(0))) + .WithSchemaChanges(rowset.GetValueOrDefault<Schema::CdcStream::SchemaChanges>(false)) .WithAwsRegion(rowset.GetValue<Schema::CdcStream::AwsRegion>()) .WithState(rowset.GetValue<Schema::CdcStream::State>()); @@ -3094,6 +3095,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { .WithFormat(rowset.GetValue<Schema::CdcStreamAlterData::Format>()) .WithVirtualTimestamps(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::VirtualTimestamps>(false)) .WithResolvedTimestamps(TDuration::MilliSeconds(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::ResolvedTimestampsIntervalMs>(0))) + .WithSchemaChanges(rowset.GetValueOrDefault<Schema::CdcStreamAlterData::SchemaChanges>(false)) .WithAwsRegion(rowset.GetValue<Schema::CdcStreamAlterData::AwsRegion>()) .WithState(rowset.GetValue<Schema::CdcStreamAlterData::State>()); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 6db5e4e6f55..45df09671a3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -272,6 +272,17 @@ public: } } + if (streamDesc.GetSchemaChanges()) { + switch (streamDesc.GetFormat()) { + case NKikimrSchemeOp::ECdcStreamFormatJson: + break; + default: + result->SetError(NKikimrScheme::StatusInvalidParameter, + "SCHEMA_CHANGES incompatible with specified stream format"); + return result; + } + } + TString errStr; if (!context.SS->CheckLocks(tablePath.Base()->PathId, Transaction, errStr)) { result->SetError(NKikimrScheme::StatusMultipleModifications, errStr); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 570f5353ac4..92c566df1f3 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -1834,6 +1834,7 @@ void TSchemeShard::PersistCdcStream(NIceDb::TNiceDb& db, const TPathId& pathId) NIceDb::TUpdate<Schema::CdcStream::Format>(alterData->Format), NIceDb::TUpdate<Schema::CdcStream::VirtualTimestamps>(alterData->VirtualTimestamps), NIceDb::TUpdate<Schema::CdcStream::ResolvedTimestampsIntervalMs>(alterData->ResolvedTimestamps.MilliSeconds()), + NIceDb::TUpdate<Schema::CdcStream::SchemaChanges>(alterData->SchemaChanges), NIceDb::TUpdate<Schema::CdcStream::AwsRegion>(alterData->AwsRegion), NIceDb::TUpdate<Schema::CdcStream::State>(alterData->State) ); @@ -1860,6 +1861,7 @@ void TSchemeShard::PersistCdcStreamAlterData(NIceDb::TNiceDb& db, const TPathId& NIceDb::TUpdate<Schema::CdcStreamAlterData::Format>(alterData->Format), NIceDb::TUpdate<Schema::CdcStreamAlterData::VirtualTimestamps>(alterData->VirtualTimestamps), NIceDb::TUpdate<Schema::CdcStreamAlterData::ResolvedTimestampsIntervalMs>(alterData->ResolvedTimestamps.MilliSeconds()), + NIceDb::TUpdate<Schema::CdcStreamAlterData::SchemaChanges>(alterData->SchemaChanges), NIceDb::TUpdate<Schema::CdcStreamAlterData::AwsRegion>(alterData->AwsRegion), NIceDb::TUpdate<Schema::CdcStreamAlterData::State>(alterData->State) ); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 547ead69191..376d737c643 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2507,6 +2507,7 @@ struct TCdcStreamSettings { OPTION(EFormat, Format); OPTION(bool, VirtualTimestamps); OPTION(TDuration, ResolvedTimestamps); + OPTION(bool, SchemaChanges); OPTION(TString, AwsRegion); OPTION(EState, State); @@ -2554,6 +2555,7 @@ struct TCdcStreamInfo .WithFormat(desc.GetFormat()) .WithVirtualTimestamps(desc.GetVirtualTimestamps()) .WithResolvedTimestamps(TDuration::MilliSeconds(desc.GetResolvedTimestampsIntervalMs())) + .WithSchemaChanges(desc.GetSchemaChanges()) .WithAwsRegion(desc.GetAwsRegion())); TPtr alterData = result->CreateNextVersion(); alterData->State = EState::ECdcStreamStateReady; @@ -2570,6 +2572,7 @@ struct TCdcStreamInfo desc.SetFormat(Format); desc.SetVirtualTimestamps(VirtualTimestamps); desc.SetResolvedTimestampsIntervalMs(ResolvedTimestamps.MilliSeconds()); + desc.SetSchemaChanges(SchemaChanges); desc.SetAwsRegion(AwsRegion); desc.SetState(State); if (ScanShards) { diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 7025b9fe91c..c920a3de738 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1729,6 +1729,7 @@ struct Schema : NIceDb::Schema { struct VirtualTimestamps : Column<7, NScheme::NTypeIds::Bool> {}; struct AwsRegion : Column<8, NScheme::NTypeIds::Utf8> {}; struct ResolvedTimestampsIntervalMs : Column<9, NScheme::NTypeIds::Uint64> {}; + struct SchemaChanges: Column<10, NScheme::NTypeIds::Bool> {}; using TKey = TableKey<OwnerPathId, LocalPathId>; using TColumns = TableColumns< @@ -1740,7 +1741,8 @@ struct Schema : NIceDb::Schema { Format, VirtualTimestamps, AwsRegion, - ResolvedTimestampsIntervalMs + ResolvedTimestampsIntervalMs, + SchemaChanges >; }; @@ -1754,6 +1756,7 @@ struct Schema : NIceDb::Schema { struct VirtualTimestamps : Column<7, NScheme::NTypeIds::Bool> {}; struct AwsRegion : Column<8, NScheme::NTypeIds::Utf8> {}; struct ResolvedTimestampsIntervalMs : Column<9, NScheme::NTypeIds::Uint64> {}; + struct SchemaChanges: Column<10, NScheme::NTypeIds::Bool> {}; using TKey = TableKey<OwnerPathId, LocalPathId>; using TColumns = TableColumns< @@ -1765,7 +1768,8 @@ struct Schema : NIceDb::Schema { Format, VirtualTimestamps, AwsRegion, - ResolvedTimestampsIntervalMs + ResolvedTimestampsIntervalMs, + SchemaChanges >; }; diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index 14b7c17cef1..1fbedaf6a9c 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -164,6 +164,49 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } } + Y_UNIT_TEST(SchemaChanges) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + for (const char* format : TVector<const char*>{"Json"}) { + TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamDescription { + Name: "Stream%s" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormat%s + SchemaChanges: true + } + )", format, format)); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, Sprintf("/MyRoot/Table/Stream%s", format)), { + NLs::StreamSchemaChanges(true), + }); + } + + for (const char* format : TVector<const char*>{"Proto", "DynamoDBStreamsJson", "DebeziumJson"}) { + TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamDescription { + Name: "Stream%s" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormat%s + SchemaChanges: true + } + )", format, format), {NKikimrScheme::StatusInvalidParameter}); + } + } + Y_UNIT_TEST(RetentionPeriod) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index 19f1f52d318..36b4770451f 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -177,6 +177,40 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } + Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithSchemaChanges) { + T t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatJson + SchemaChanges: true + } + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::StreamSchemaChanges(true), + }); + } + }); + } + Y_UNIT_TEST_WITH_REBOOTS(DisableStream) { T t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index 71488d79dab..82a883c69c3 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -954,6 +954,12 @@ TCheckFunc StreamResolvedTimestamps(const TDuration& value) { }; } +TCheckFunc StreamSchemaChanges(bool value) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetSchemaChanges(), value); + }; +} + TCheckFunc StreamAwsRegion(const TString& value) { return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetCdcStreamDescription().GetAwsRegion(), value); diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h index a7dbd864400..416ba5cb55a 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h @@ -184,6 +184,7 @@ namespace NLs { TCheckFunc StreamState(NKikimrSchemeOp::ECdcStreamState state); TCheckFunc StreamVirtualTimestamps(bool value); TCheckFunc StreamResolvedTimestamps(const TDuration& value); + TCheckFunc StreamSchemaChanges(bool value); TCheckFunc StreamAwsRegion(const TString& value); TCheckFunc StreamInitialScanProgress(ui32 total, ui32 completed); TCheckFunc RetentionPeriod(const TDuration& value); diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 0ac3fcc34d4..f909c7a8d51 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -6894,6 +6894,11 @@ "ColumnId": 9, "ColumnName": "ResolvedTimestampsIntervalMs", "ColumnType": "Uint64" + }, + { + "ColumnId": 10, + "ColumnName": "SchemaChanges", + "ColumnType": "Bool" } ], "ColumnsDropped": [], @@ -6908,7 +6913,8 @@ 6, 7, 8, - 9 + 9, + 10 ], "RoomID": 0, "Codec": 0, @@ -6981,6 +6987,11 @@ "ColumnId": 9, "ColumnName": "ResolvedTimestampsIntervalMs", "ColumnType": "Uint64" + }, + { + "ColumnId": 10, + "ColumnName": "SchemaChanges", + "ColumnType": "Bool" } ], "ColumnsDropped": [], @@ -6995,7 +7006,8 @@ 6, 7, 8, - 9 + 9, + 10 ], "RoomID": 0, "Codec": 0, |