aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2025-06-05 00:38:48 +0300
committerGitHub <noreply@github.com>2025-06-04 21:38:48 +0000
commit8bbfc1965ae5de65c96c36238b45ec33dd84f0a0 (patch)
tree4055764945a13a0d8beadbcea7a844e2f1064b32
parentc0ca0fdd4f20f30d9dee09caebe5e953b4827a1b (diff)
downloadydb-8bbfc1965ae5de65c96c36238b45ec33dd84f0a0.tar.gz
New CHANGEFEED's option: SCHEMA_CHANGES (#19303)
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp6
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp69
-rw-r--r--ydb/core/sys_view/show_create/create_table_formatter.cpp5
-rw-r--r--ydb/core/sys_view/ut_kqp.cpp21
-rw-r--r--ydb/core/ydb_convert/table_description.cpp2
-rw-r--r--ydb/public/api/protos/ydb_table.proto4
-rw-r--r--ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h4
-rw-r--r--ydb/public/sdk/cpp/src/client/table/table.cpp17
8 files changed, 127 insertions, 1 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index c5a1902a295..02872d11adc 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -2066,6 +2066,12 @@ public:
const auto interval = TDuration::FromValue(value);
auto& resolvedTimestamps = *add_changefeed->mutable_resolved_timestamps_interval();
resolvedTimestamps.set_seconds(interval.Seconds());
+ } else if (name == "schema_changes") {
+ auto value = TString(
+ setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
+ );
+
+ add_changefeed->set_schema_changes(FromString<bool>(to_lower(value)));
} else if (name == "retention_period") {
YQL_ENSURE(setting.Value().Maybe<TCoInterval>());
const auto value = FromString<i64>(
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index 2058a356954..c54ba4078cb 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -5638,6 +5638,75 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}
}
+ Y_UNIT_TEST_TWIN(ChangefeedSchemaChanges, UseQueryService) {
+ TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
+ auto queryClient = kikimr.GetQueryClient();
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto executeQuery = [&queryClient, &session](const TString& query) {
+ if constexpr (UseQueryService) {
+ Y_UNUSED(session);
+ return queryClient.ExecuteQuery(query, NQuery::TTxControl::NoTx()).ExtractValueSync();
+ } else {
+ Y_UNUSED(queryClient);
+ return session.ExecuteSchemeQuery(query).ExtractValueSync();
+ }
+ };
+
+ {
+ auto query = R"(
+ --!syntax_v1
+ CREATE TABLE `/Root/table` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )";
+
+ auto result = executeQuery(query);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // default (disabled)
+ auto query = R"(
+ --!syntax_v1
+ ALTER TABLE `/Root/table` ADD CHANGEFEED `feed_1` WITH (
+ MODE = 'KEYS_ONLY', FORMAT = 'JSON'
+ );
+ )";
+
+ const auto result = executeQuery(query);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto describeResult = session.DescribeTable("/Root/table").ExtractValueSync();
+ UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
+
+ const auto& changefeeds = describeResult.GetTableDescription().GetChangefeedDescriptions();
+ UNIT_ASSERT_VALUES_EQUAL(changefeeds.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(changefeeds.at(0).GetSchemaChanges(), false);
+ }
+
+ { // enabled
+ auto query = R"(
+ --!syntax_v1
+ ALTER TABLE `/Root/table` ADD CHANGEFEED `feed_2` WITH (
+ MODE = 'KEYS_ONLY', FORMAT = 'JSON', SCHEMA_CHANGES = TRUE
+ );
+ )";
+
+ const auto result = executeQuery(query);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto describeResult = session.DescribeTable("/Root/table").ExtractValueSync();
+ UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
+
+ const auto& changefeeds = describeResult.GetTableDescription().GetChangefeedDescriptions();
+ UNIT_ASSERT_VALUES_EQUAL(changefeeds.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(changefeeds.at(1).GetSchemaChanges(), true);
+ }
+ }
+
Y_UNIT_TEST(ChangefeedRetentionPeriod) {
using namespace NTopic;
diff --git a/ydb/core/sys_view/show_create/create_table_formatter.cpp b/ydb/core/sys_view/show_create/create_table_formatter.cpp
index 3e67898ec02..b21c4a70814 100644
--- a/ydb/core/sys_view/show_create/create_table_formatter.cpp
+++ b/ydb/core/sys_view/show_create/create_table_formatter.cpp
@@ -996,6 +996,11 @@ void TCreateTableFormatter::Format(const TString& tablePath, const NKikimrScheme
del = ", ";
}
+ if (cdcStream.GetSchemaChanges()) {
+ Stream << del << "SCHEMA_CHANGES = TRUE";
+ del = ", ";
+ }
+
if (cdcStream.HasAwsRegion() && !cdcStream.GetAwsRegion().empty()) {
Stream << del << "AWS_REGION = \'" << cdcStream.GetAwsRegion() << "\'";
del = ", ";
diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp
index b01271a3201..6680b6c9cf1 100644
--- a/ydb/core/sys_view/ut_kqp.cpp
+++ b/ydb/core/sys_view/ut_kqp.cpp
@@ -1739,6 +1739,27 @@ ALTER TABLE `test_show_create`
checker.CheckShowCreateTable(R"(
CREATE TABLE test_show_create (
+ Key String,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ ALTER TABLE test_show_create
+ ADD CHANGEFEED `feed` WITH (MODE = 'KEYS_ONLY', FORMAT = 'JSON', SCHEMA_CHANGES = TRUE);
+ )", "test_show_create",
+R"(CREATE TABLE `test_show_create` (
+ `Key` String,
+ `Value` String,
+ PRIMARY KEY (`Key`)
+);
+
+ALTER TABLE `test_show_create`
+ ADD CHANGEFEED `feed` WITH (MODE = 'KEYS_ONLY', FORMAT = 'JSON', SCHEMA_CHANGES = TRUE, RETENTION_PERIOD = INTERVAL('P1D'))
+;
+)"
+ );
+
+ checker.CheckShowCreateTable(R"(
+ CREATE TABLE test_show_create (
Key Uint64,
Value String,
PRIMARY KEY (Key)
diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp
index 1293476a484..8f5ca9e86fc 100644
--- a/ydb/core/ydb_convert/table_description.cpp
+++ b/ydb/core/ydb_convert/table_description.cpp
@@ -1179,6 +1179,7 @@ void FillChangefeedDescription(Ydb::Table::ChangefeedDescription& out,
out.set_name(in.GetName());
out.set_virtual_timestamps(in.GetVirtualTimestamps());
+ out.set_schema_changes(in.GetSchemaChanges());
out.set_aws_region(in.GetAwsRegion());
if (const auto value = in.GetResolvedTimestampsIntervalMs()) {
@@ -1244,6 +1245,7 @@ bool FillChangefeedDescriptionCommon(NKikimrSchemeOp::TCdcStreamDescription& out
out.SetName(in.name());
out.SetVirtualTimestamps(in.virtual_timestamps());
+ out.SetSchemaChanges(in.schema_changes());
out.SetAwsRegion(in.aws_region());
if (in.has_resolved_timestamps_interval()) {
diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto
index e34976efaf2..5823e531c6f 100644
--- a/ydb/public/api/protos/ydb_table.proto
+++ b/ydb/public/api/protos/ydb_table.proto
@@ -233,6 +233,8 @@ message Changefeed {
google.protobuf.Duration resolved_timestamps_interval = 9;
// Partitioning settings of underlying topic.
Topic.PartitioningSettings topic_partitioning_settings = 10;
+ // Emit schema change events or not
+ bool schema_changes = 11;
}
message ChangefeedDescription {
@@ -274,6 +276,8 @@ message ChangefeedDescription {
google.protobuf.Duration resolved_timestamps_interval = 8;
// Progress of initial scan. If unspecified, initial scan was not launched.
InitialScanProgress initial_scan_progress = 9;
+ // State of emitting of schema change events
+ bool schema_changes = 10;
}
message StoragePool {
diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h
index b9680dcca44..01bccbc00c0 100644
--- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h
+++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h
@@ -397,6 +397,8 @@ public:
// Enable virtual timestamps
TChangefeedDescription& WithVirtualTimestamps();
+ // Enable schema changes
+ TChangefeedDescription& WithSchemaChanges();
// Enable resolved timestamps
TChangefeedDescription& WithResolvedTimestamps(const TDuration& interval);
// Customise retention period of underlying topic (24h by default).
@@ -415,6 +417,7 @@ public:
EChangefeedFormat GetFormat() const;
EChangefeedState GetState() const;
bool GetVirtualTimestamps() const;
+ bool GetSchemaChanges() const;
const std::optional<TDuration>& GetResolvedTimestamps() const;
bool GetInitialScan() const;
const std::unordered_map<std::string, std::string>& GetAttributes() const;
@@ -442,6 +445,7 @@ private:
EChangefeedFormat Format_;
EChangefeedState State_ = EChangefeedState::Unknown;
bool VirtualTimestamps_ = false;
+ bool SchemaChanges_ = false;
std::optional<TDuration> ResolvedTimestamps_;
std::optional<TDuration> RetentionPeriod_;
bool InitialScan_ = false;
diff --git a/ydb/public/sdk/cpp/src/client/table/table.cpp b/ydb/public/sdk/cpp/src/client/table/table.cpp
index 147f9e7b4c1..65b271ddc04 100644
--- a/ydb/public/sdk/cpp/src/client/table/table.cpp
+++ b/ydb/public/sdk/cpp/src/client/table/table.cpp
@@ -2687,6 +2687,11 @@ TChangefeedDescription& TChangefeedDescription::WithVirtualTimestamps() {
return *this;
}
+TChangefeedDescription& TChangefeedDescription::WithSchemaChanges() {
+ SchemaChanges_ = true;
+ return *this;
+}
+
TChangefeedDescription& TChangefeedDescription::WithResolvedTimestamps(const TDuration& value) {
ResolvedTimestamps_ = value;
return *this;
@@ -2742,6 +2747,10 @@ bool TChangefeedDescription::GetVirtualTimestamps() const {
return VirtualTimestamps_;
}
+bool TChangefeedDescription::GetSchemaChanges() const {
+ return SchemaChanges_;
+}
+
const std::optional<TDuration>& TChangefeedDescription::GetResolvedTimestamps() const {
return ResolvedTimestamps_;
}
@@ -2806,6 +2815,9 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
if (proto.virtual_timestamps()) {
ret.WithVirtualTimestamps();
}
+ if (proto.schema_changes()) {
+ ret.WithSchemaChanges();
+ }
if (proto.has_resolved_timestamps_interval()) {
ret.WithResolvedTimestamps(TDuration::MilliSeconds(
::google::protobuf::util::TimeUtil::DurationToMilliseconds(proto.resolved_timestamps_interval())));
@@ -2849,6 +2861,7 @@ template <typename TProto>
void TChangefeedDescription::SerializeCommonFields(TProto& proto) const {
proto.set_name(TStringType{Name_});
proto.set_virtual_timestamps(VirtualTimestamps_);
+ proto.set_schema_changes(SchemaChanges_);
proto.set_aws_region(TStringType{AwsRegion_});
switch (Mode_) {
@@ -2932,7 +2945,8 @@ void TChangefeedDescription::Out(IOutputStream& o) const {
o << "{ name: \"" << Name_ << "\""
<< ", mode: " << Mode_ << ""
<< ", format: " << Format_ << ""
- << ", virtual_timestamps: " << (VirtualTimestamps_ ? "on": "off") << "";
+ << ", virtual_timestamps: " << (VirtualTimestamps_ ? "on": "off") << ""
+ << ", schema_changes: " << (SchemaChanges_ ? "on": "off") << "";
if (ResolvedTimestamps_) {
o << ", resolved_timestamps: " << *ResolvedTimestamps_;
@@ -2958,6 +2972,7 @@ bool operator==(const TChangefeedDescription& lhs, const TChangefeedDescription&
&& lhs.GetMode() == rhs.GetMode()
&& lhs.GetFormat() == rhs.GetFormat()
&& lhs.GetVirtualTimestamps() == rhs.GetVirtualTimestamps()
+ && lhs.GetSchemaChanges() == rhs.GetSchemaChanges()
&& lhs.GetResolvedTimestamps() == rhs.GetResolvedTimestamps()
&& lhs.GetAwsRegion() == rhs.GetAwsRegion();
}