diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2025-06-05 00:38:48 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-06-04 21:38:48 +0000 |
commit | 8bbfc1965ae5de65c96c36238b45ec33dd84f0a0 (patch) | |
tree | 4055764945a13a0d8beadbcea7a844e2f1064b32 | |
parent | c0ca0fdd4f20f30d9dee09caebe5e953b4827a1b (diff) | |
download | ydb-8bbfc1965ae5de65c96c36238b45ec33dd84f0a0.tar.gz |
New CHANGEFEED's option: SCHEMA_CHANGES (#19303)
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 69 | ||||
-rw-r--r-- | ydb/core/sys_view/show_create/create_table_formatter.cpp | 5 | ||||
-rw-r--r-- | ydb/core/sys_view/ut_kqp.cpp | 21 | ||||
-rw-r--r-- | ydb/core/ydb_convert/table_description.cpp | 2 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_table.proto | 4 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h | 4 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/src/client/table/table.cpp | 17 |
8 files changed, 127 insertions, 1 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index c5a1902a295..02872d11adc 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -2066,6 +2066,12 @@ public: const auto interval = TDuration::FromValue(value); auto& resolvedTimestamps = *add_changefeed->mutable_resolved_timestamps_interval(); resolvedTimestamps.set_seconds(interval.Seconds()); + } else if (name == "schema_changes") { + auto value = TString( + setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value() + ); + + add_changefeed->set_schema_changes(FromString<bool>(to_lower(value))); } else if (name == "retention_period") { YQL_ENSURE(setting.Value().Maybe<TCoInterval>()); const auto value = FromString<i64>( diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 2058a356954..c54ba4078cb 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -5638,6 +5638,75 @@ Y_UNIT_TEST_SUITE(KqpScheme) { } } + Y_UNIT_TEST_TWIN(ChangefeedSchemaChanges, UseQueryService) { + TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig())); + auto queryClient = kikimr.GetQueryClient(); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto executeQuery = [&queryClient, &session](const TString& query) { + if constexpr (UseQueryService) { + Y_UNUSED(session); + return queryClient.ExecuteQuery(query, NQuery::TTxControl::NoTx()).ExtractValueSync(); + } else { + Y_UNUSED(queryClient); + return session.ExecuteSchemeQuery(query).ExtractValueSync(); + } + }; + + { + auto query = R"( + --!syntax_v1 + CREATE TABLE `/Root/table` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )"; + + auto result = executeQuery(query); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // default (disabled) + auto query = R"( + --!syntax_v1 + ALTER TABLE `/Root/table` ADD CHANGEFEED `feed_1` WITH ( + MODE = 'KEYS_ONLY', FORMAT = 'JSON' + ); + )"; + + const auto result = executeQuery(query); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto describeResult = session.DescribeTable("/Root/table").ExtractValueSync(); + UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString()); + + const auto& changefeeds = describeResult.GetTableDescription().GetChangefeedDescriptions(); + UNIT_ASSERT_VALUES_EQUAL(changefeeds.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(changefeeds.at(0).GetSchemaChanges(), false); + } + + { // enabled + auto query = R"( + --!syntax_v1 + ALTER TABLE `/Root/table` ADD CHANGEFEED `feed_2` WITH ( + MODE = 'KEYS_ONLY', FORMAT = 'JSON', SCHEMA_CHANGES = TRUE + ); + )"; + + const auto result = executeQuery(query); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto describeResult = session.DescribeTable("/Root/table").ExtractValueSync(); + UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString()); + + const auto& changefeeds = describeResult.GetTableDescription().GetChangefeedDescriptions(); + UNIT_ASSERT_VALUES_EQUAL(changefeeds.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(changefeeds.at(1).GetSchemaChanges(), true); + } + } + Y_UNIT_TEST(ChangefeedRetentionPeriod) { using namespace NTopic; diff --git a/ydb/core/sys_view/show_create/create_table_formatter.cpp b/ydb/core/sys_view/show_create/create_table_formatter.cpp index 3e67898ec02..b21c4a70814 100644 --- a/ydb/core/sys_view/show_create/create_table_formatter.cpp +++ b/ydb/core/sys_view/show_create/create_table_formatter.cpp @@ -996,6 +996,11 @@ void TCreateTableFormatter::Format(const TString& tablePath, const NKikimrScheme del = ", "; } + if (cdcStream.GetSchemaChanges()) { + Stream << del << "SCHEMA_CHANGES = TRUE"; + del = ", "; + } + if (cdcStream.HasAwsRegion() && !cdcStream.GetAwsRegion().empty()) { Stream << del << "AWS_REGION = \'" << cdcStream.GetAwsRegion() << "\'"; del = ", "; diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp index b01271a3201..6680b6c9cf1 100644 --- a/ydb/core/sys_view/ut_kqp.cpp +++ b/ydb/core/sys_view/ut_kqp.cpp @@ -1739,6 +1739,27 @@ ALTER TABLE `test_show_create` checker.CheckShowCreateTable(R"( CREATE TABLE test_show_create ( + Key String, + Value String, + PRIMARY KEY (Key) + ); + ALTER TABLE test_show_create + ADD CHANGEFEED `feed` WITH (MODE = 'KEYS_ONLY', FORMAT = 'JSON', SCHEMA_CHANGES = TRUE); + )", "test_show_create", +R"(CREATE TABLE `test_show_create` ( + `Key` String, + `Value` String, + PRIMARY KEY (`Key`) +); + +ALTER TABLE `test_show_create` + ADD CHANGEFEED `feed` WITH (MODE = 'KEYS_ONLY', FORMAT = 'JSON', SCHEMA_CHANGES = TRUE, RETENTION_PERIOD = INTERVAL('P1D')) +; +)" + ); + + checker.CheckShowCreateTable(R"( + CREATE TABLE test_show_create ( Key Uint64, Value String, PRIMARY KEY (Key) diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index 1293476a484..8f5ca9e86fc 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -1179,6 +1179,7 @@ void FillChangefeedDescription(Ydb::Table::ChangefeedDescription& out, out.set_name(in.GetName()); out.set_virtual_timestamps(in.GetVirtualTimestamps()); + out.set_schema_changes(in.GetSchemaChanges()); out.set_aws_region(in.GetAwsRegion()); if (const auto value = in.GetResolvedTimestampsIntervalMs()) { @@ -1244,6 +1245,7 @@ bool FillChangefeedDescriptionCommon(NKikimrSchemeOp::TCdcStreamDescription& out out.SetName(in.name()); out.SetVirtualTimestamps(in.virtual_timestamps()); + out.SetSchemaChanges(in.schema_changes()); out.SetAwsRegion(in.aws_region()); if (in.has_resolved_timestamps_interval()) { diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index e34976efaf2..5823e531c6f 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -233,6 +233,8 @@ message Changefeed { google.protobuf.Duration resolved_timestamps_interval = 9; // Partitioning settings of underlying topic. Topic.PartitioningSettings topic_partitioning_settings = 10; + // Emit schema change events or not + bool schema_changes = 11; } message ChangefeedDescription { @@ -274,6 +276,8 @@ message ChangefeedDescription { google.protobuf.Duration resolved_timestamps_interval = 8; // Progress of initial scan. If unspecified, initial scan was not launched. InitialScanProgress initial_scan_progress = 9; + // State of emitting of schema change events + bool schema_changes = 10; } message StoragePool { diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h index b9680dcca44..01bccbc00c0 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h @@ -397,6 +397,8 @@ public: // Enable virtual timestamps TChangefeedDescription& WithVirtualTimestamps(); + // Enable schema changes + TChangefeedDescription& WithSchemaChanges(); // Enable resolved timestamps TChangefeedDescription& WithResolvedTimestamps(const TDuration& interval); // Customise retention period of underlying topic (24h by default). @@ -415,6 +417,7 @@ public: EChangefeedFormat GetFormat() const; EChangefeedState GetState() const; bool GetVirtualTimestamps() const; + bool GetSchemaChanges() const; const std::optional<TDuration>& GetResolvedTimestamps() const; bool GetInitialScan() const; const std::unordered_map<std::string, std::string>& GetAttributes() const; @@ -442,6 +445,7 @@ private: EChangefeedFormat Format_; EChangefeedState State_ = EChangefeedState::Unknown; bool VirtualTimestamps_ = false; + bool SchemaChanges_ = false; std::optional<TDuration> ResolvedTimestamps_; std::optional<TDuration> RetentionPeriod_; bool InitialScan_ = false; diff --git a/ydb/public/sdk/cpp/src/client/table/table.cpp b/ydb/public/sdk/cpp/src/client/table/table.cpp index 147f9e7b4c1..65b271ddc04 100644 --- a/ydb/public/sdk/cpp/src/client/table/table.cpp +++ b/ydb/public/sdk/cpp/src/client/table/table.cpp @@ -2687,6 +2687,11 @@ TChangefeedDescription& TChangefeedDescription::WithVirtualTimestamps() { return *this; } +TChangefeedDescription& TChangefeedDescription::WithSchemaChanges() { + SchemaChanges_ = true; + return *this; +} + TChangefeedDescription& TChangefeedDescription::WithResolvedTimestamps(const TDuration& value) { ResolvedTimestamps_ = value; return *this; @@ -2742,6 +2747,10 @@ bool TChangefeedDescription::GetVirtualTimestamps() const { return VirtualTimestamps_; } +bool TChangefeedDescription::GetSchemaChanges() const { + return SchemaChanges_; +} + const std::optional<TDuration>& TChangefeedDescription::GetResolvedTimestamps() const { return ResolvedTimestamps_; } @@ -2806,6 +2815,9 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { if (proto.virtual_timestamps()) { ret.WithVirtualTimestamps(); } + if (proto.schema_changes()) { + ret.WithSchemaChanges(); + } if (proto.has_resolved_timestamps_interval()) { ret.WithResolvedTimestamps(TDuration::MilliSeconds( ::google::protobuf::util::TimeUtil::DurationToMilliseconds(proto.resolved_timestamps_interval()))); @@ -2849,6 +2861,7 @@ template <typename TProto> void TChangefeedDescription::SerializeCommonFields(TProto& proto) const { proto.set_name(TStringType{Name_}); proto.set_virtual_timestamps(VirtualTimestamps_); + proto.set_schema_changes(SchemaChanges_); proto.set_aws_region(TStringType{AwsRegion_}); switch (Mode_) { @@ -2932,7 +2945,8 @@ void TChangefeedDescription::Out(IOutputStream& o) const { o << "{ name: \"" << Name_ << "\"" << ", mode: " << Mode_ << "" << ", format: " << Format_ << "" - << ", virtual_timestamps: " << (VirtualTimestamps_ ? "on": "off") << ""; + << ", virtual_timestamps: " << (VirtualTimestamps_ ? "on": "off") << "" + << ", schema_changes: " << (SchemaChanges_ ? "on": "off") << ""; if (ResolvedTimestamps_) { o << ", resolved_timestamps: " << *ResolvedTimestamps_; @@ -2958,6 +2972,7 @@ bool operator==(const TChangefeedDescription& lhs, const TChangefeedDescription& && lhs.GetMode() == rhs.GetMode() && lhs.GetFormat() == rhs.GetFormat() && lhs.GetVirtualTimestamps() == rhs.GetVirtualTimestamps() + && lhs.GetSchemaChanges() == rhs.GetSchemaChanges() && lhs.GetResolvedTimestamps() == rhs.GetResolvedTimestamps() && lhs.GetAwsRegion() == rhs.GetAwsRegion(); } |