diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-10-13 12:51:48 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-10-13 13:40:59 +0300 |
commit | 9d5156a9810bb20f3c0406268a8beb2701a114c3 (patch) | |
tree | aab9009126051aa4c864be6b3a320ecda2cd12d3 | |
parent | d8f0ea2538f0f50147d3d4c5fcbe0297b504ab12 (diff) | |
download | ydb-9d5156a9810bb20f3c0406268a8beb2701a114c3.tar.gz |
Explicit number of changefeed's underlying topic partitions KIKIMR-19460
-rw-r--r-- | ydb/core/grpc_services/rpc_alter_table.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 109 | ||||
-rw-r--r-- | ydb/core/protos/flat_scheme_op.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp | 67 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp | 58 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/format/sql_format_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/node.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/query.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql_expression.cpp | 6 | ||||
-rw-r--r-- | ydb/library/yql/sql/v1/sql_ut.cpp | 15 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_table.proto | 3 |
12 files changed, 269 insertions, 22 deletions
diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp index e82991093d1..fe01512b5aa 100644 --- a/ydb/core/grpc_services/rpc_alter_table.cpp +++ b/ydb/core/grpc_services/rpc_alter_table.cpp @@ -498,10 +498,23 @@ private: for (const auto& add : req->add_changefeeds()) { auto op = modifyScheme->MutableCreateCdcStream(); op->SetTableName(name); + if (add.has_retention_period()) { op->SetRetentionPeriodSeconds(add.retention_period().seconds()); } + if (add.has_topic_partitioning_settings()) { + i64 minActivePartitions = add.topic_partitioning_settings().min_active_partitions(); + if (minActivePartitions < 0) { + NYql::TIssues issues; + issues.AddIssue(NYql::TIssue("Topic partitions count must be positive")); + return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); + } else if (minActivePartitions == 0) { + minActivePartitions = 1; + } + op->SetTopicPartitions(minActivePartitions); + } + StatusIds::StatusCode code; TString error; if (!FillChangefeedDescription(*op->MutableStreamDescription(), add, code, error)) { diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 81c21614d4a..4242f27a146 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1442,6 +1442,19 @@ public: const auto duration = TDuration::FromValue(value); auto& retention = *add_changefeed->mutable_retention_period(); retention.set_seconds(duration.Seconds()); + } else if (name == "topic_min_active_partitions") { + auto value = TString( + setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value() + ); + + i64 minActivePartitions; + if (!TryFromString(value, minActivePartitions) || minActivePartitions <= 0) { + ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()), + TStringBuilder() << name << " must be greater than 0")); + return SyncError(); + } + + add_changefeed->mutable_topic_partitioning_settings()->set_min_active_partitions(minActivePartitions); } else if (name == "aws_region") { auto value = TString( setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value() diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 74af347cdc2..3fe92a66fb1 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -3808,6 +3808,115 @@ Y_UNIT_TEST_SUITE(KqpScheme) { } } + Y_UNIT_TEST(ChangefeedTopicPartitions) { + using namespace NTopic; + + TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig())); + auto pq = TTopicClient(kikimr.GetDriver(), TTopicClientSettings().Database("/Root")); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { // Uint64 key + auto query = R"( + --!syntax_v1 + CREATE TABLE `/Root/table_1` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // default + auto query = R"( + --!syntax_v1 + ALTER TABLE `/Root/table_1` ADD CHANGEFEED `feed_1` WITH ( + MODE = 'KEYS_ONLY', FORMAT = 'JSON' + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto desc = pq.DescribeTopic("/Root/table_1/feed_1").ExtractValueSync(); + UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitions().size(), 1); + } + + { // custom + auto query = R"( + --!syntax_v1 + ALTER TABLE `/Root/table_1` ADD CHANGEFEED `feed_2` WITH ( + MODE = 'KEYS_ONLY', FORMAT = 'JSON', TOPIC_MIN_ACTIVE_PARTITIONS = 10 + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto desc = pq.DescribeTopic("/Root/table_1/feed_2").ExtractValueSync(); + UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitions().size(), 10); + } + + { // non-positive (invalid) + auto query = R"( + --!syntax_v1 + ALTER TABLE `/Root/table_1` ADD CHANGEFEED `feed_3` WITH ( + MODE = 'KEYS_ONLY', FORMAT = 'JSON', TOPIC_MIN_ACTIVE_PARTITIONS = 0 + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); + } + + { // Utf8 key + auto query = R"( + --!syntax_v1 + CREATE TABLE `/Root/table_2` ( + Key Utf8, + Value String, + PRIMARY KEY (Key) + ); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // default + auto query = R"( + --!syntax_v1 + ALTER TABLE `/Root/table_2` ADD CHANGEFEED `feed_1` WITH ( + MODE = 'KEYS_ONLY', FORMAT = 'JSON' + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto desc = pq.DescribeTopic("/Root/table_2/feed_1").ExtractValueSync(); + UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitions().size(), 1); + } + + { // custom + auto query = R"( + --!syntax_v1 + ALTER TABLE `/Root/table_2` ADD CHANGEFEED `feed_2` WITH ( + MODE = 'KEYS_ONLY', FORMAT = 'JSON', TOPIC_MIN_ACTIVE_PARTITIONS = 10 + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); + } + } + Y_UNIT_TEST(ChangefeedAttributes) { TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig())); auto db = kikimr.GetTableClient(); diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index f6ece2ed23c..8d02114ab4d 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -761,6 +761,7 @@ message TCreateCdcStream { optional string TableName = 1; optional TCdcStreamDescription StreamDescription = 2; optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default + optional uint32 TopicPartitions = 4; } message TAlterCdcStream { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 9dc0d4b9672..96bf95347cd 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -723,6 +723,37 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran << "Initial scan is not supported yet")}; } + Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); + auto table = context.SS->Tables.at(tablePath.Base()->PathId); + + TVector<TString> boundaries; + if (op.HasTopicPartitions()) { + if (op.GetTopicPartitions() <= 0) { + return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, "Topic partitions count must be greater than 0")}; + } + + const auto& keyColumns = table->KeyColumnIds; + const auto& columns = table->Columns; + + Y_ABORT_UNLESS(!keyColumns.empty()); + Y_ABORT_UNLESS(columns.contains(keyColumns.at(0))); + const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType; + + if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) { + return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)}; + } + } else { + const auto& partitions = table->GetPartitions(); + boundaries.reserve(partitions.size() - 1); + + for (ui32 i = 0; i < partitions.size(); ++i) { + const auto& partition = partitions.at(i); + if (i != partitions.size() - 1) { + boundaries.push_back(partition.EndOfRange); + } + } + } + TVector<ISubOperation::TPtr> result; if (initialScan) { @@ -759,16 +790,12 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran } { - Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId)); - auto table = context.SS->Tables.at(tablePath.Base()->PathId); - const auto& partitions = table->GetPartitions(); - auto outTx = TransactionTemplate(streamPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup); outTx.SetFailOnExist(!acceptExisted); auto& desc = *outTx.MutableCreatePersQueueGroup(); desc.SetName("streamImpl"); - desc.SetTotalGroupCount(partitions.size()); + desc.SetTotalGroupCount(op.HasTopicPartitions() ? op.GetTopicPartitions() : table->GetPartitions().size()); desc.SetPartitionPerTablet(2); auto& pqConfig = *desc.MutablePQTabletConfig(); @@ -795,23 +822,19 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran } } - for (ui32 i = 0; i < partitions.size(); ++i) { - const auto& cur = partitions.at(i); - - if (i != partitions.size() - 1) { - TSerializedCellVec endKey(cur.EndOfRange); - Y_ABORT_UNLESS(endKey.GetCells().size() <= table->KeyColumnIds.size()); - - TString errStr; - auto& boundary = *desc.AddPartitionBoundaries(); - for (ui32 ki = 0; ki < endKey.GetCells().size(); ++ki) { - const auto& cell = endKey.GetCells()[ki]; - const auto tag = table->KeyColumnIds.at(ki); - Y_ABORT_UNLESS(table->Columns.contains(tag)); - const auto typeId = table->Columns.at(tag).PType; - const bool ok = NMiniKQL::CellToValue(typeId, cell, *boundary.AddTuple(), errStr); - Y_ABORT_UNLESS(ok, "Failed to build key tuple at position %" PRIu32 " error: %s", ki, errStr.data()); - } + for (const auto& serialized : boundaries) { + TSerializedCellVec endKey(serialized); + Y_ABORT_UNLESS(endKey.GetCells().size() <= table->KeyColumnIds.size()); + + TString errStr; + auto& boundary = *desc.AddPartitionBoundaries(); + for (ui32 ki = 0; ki < endKey.GetCells().size(); ++ki) { + const auto& cell = endKey.GetCells()[ki]; + const auto tag = table->KeyColumnIds.at(ki); + Y_ABORT_UNLESS(table->Columns.contains(tag)); + const auto typeId = table->Columns.at(tag).PType; + const bool ok = NMiniKQL::CellToValue(typeId, cell, *boundary.AddTuple(), errStr); + Y_ABORT_UNLESS(ok, "Failed to build key tuple at position %" PRIu32 " error: %s", ki, errStr.data()); } } diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index 3cd979308ef..97385b36f82 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -172,6 +172,64 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } } + Y_UNIT_TEST(TopicPartitions) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); + ui64 txId = 100; + + for (const auto& keyType : TVector<TString>{"Uint64", "Uint32", "Utf8"}) { + const auto status = keyType != "Utf8" + ? NKikimrScheme::StatusAccepted + : NKikimrScheme::StatusInvalidParameter; + + TestCreateTable(runtime, ++txId, "/MyRoot", Sprintf(R"( + Name: "Table%s" + Columns { Name: "key" Type: "%s" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", keyType.c_str(), keyType.c_str())); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"( + TableName: "Table%s" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + TopicPartitions: 10 + )", keyType.c_str()), {status}); + + if (status != NKikimrScheme::StatusAccepted) { + continue; + } + + env.TestWaitNotification(runtime, txId); + TestDescribeResult(DescribePrivatePath(runtime, Sprintf("/MyRoot/Table%s/Stream/streamImpl", keyType.c_str())), { + NLs::PathExist, + NLs::CheckPartCount("streamImpl", 10, 2, 5, 10), + }); + } + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + TopicPartitions: 0 + )", {NKikimrScheme::StatusInvalidParameter}); + } + Y_UNIT_TEST(Attributes) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); diff --git a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp index 14c7693c04a..fbb205bb16b 100644 --- a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp +++ b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp @@ -441,6 +441,8 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) { "ALTER TABLE user\n\tADD CHANGEFEED user WITH (virtual_timestamps = FALSE);\n\n"}, {"alter table user add changefeed user with (resolved_timestamps = Interval(\"PT1S\"))", "ALTER TABLE user\n\tADD CHANGEFEED user WITH (resolved_timestamps = Interval(\"PT1S\"));\n\n"}, + {"alter table user add changefeed user with (topic_min_active_partitions = 1)", + "ALTER TABLE user\n\tADD CHANGEFEED user WITH (topic_min_active_partitions = 1);\n\n"}, }; TSetup setup; diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h index 7dd8590d621..0c2b6bd141e 100644 --- a/ydb/library/yql/sql/v1/node.h +++ b/ydb/library/yql/sql/v1/node.h @@ -1023,6 +1023,7 @@ namespace NSQLTranslationV1 { TNodePtr VirtualTimestamps; TNodePtr ResolvedTimestamps; TNodePtr RetentionPeriod; + TNodePtr TopicPartitions; TNodePtr AwsRegion; std::optional<std::variant<TLocalSinkSettings>> SinkSettings; }; diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp index c37489868b2..feedcf290ce 100644 --- a/ydb/library/yql/sql/v1/query.cpp +++ b/ydb/library/yql/sql/v1/query.cpp @@ -187,6 +187,9 @@ static INode::TPtr CreateChangefeedDesc(const TChangefeedDescription& desc, cons if (desc.Settings.RetentionPeriod) { settings = node.L(settings, node.Q(node.Y(node.Q("retention_period"), desc.Settings.RetentionPeriod))); } + if (desc.Settings.TopicPartitions) { + settings = node.L(settings, node.Q(node.Y(node.Q("topic_min_active_partitions"), desc.Settings.TopicPartitions))); + } if (desc.Settings.AwsRegion) { settings = node.L(settings, node.Q(node.Y(node.Q("aws_region"), desc.Settings.AwsRegion))); } diff --git a/ydb/library/yql/sql/v1/sql_expression.cpp b/ydb/library/yql/sql/v1/sql_expression.cpp index b44df7cd9b9..d2fa2c205ce 100644 --- a/ydb/library/yql/sql/v1/sql_expression.cpp +++ b/ydb/library/yql/sql/v1/sql_expression.cpp @@ -169,6 +169,12 @@ bool ChangefeedSettingsEntry(const TRule_changefeed_settings_entry& node, TSqlEx return false; } settings.RetentionPeriod = exprNode; + } else if (to_lower(id.Name) == "topic_min_active_partitions") { + if (!exprNode->IsIntegerLiteral()) { + ctx.Context().Error() << "Literal of integer type is expected for " << id.Name; + return false; + } + settings.TopicPartitions = exprNode; } else if (to_lower(id.Name) == "aws_region") { if (!exprNode->IsLiteral() || exprNode->GetLiteralType() != "String") { ctx.Context().Error() << "Literal of String type is expected for " << id.Name; diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp index a659c501d1e..22da243ec8f 100644 --- a/ydb/library/yql/sql/v1/sql_ut.cpp +++ b/ydb/library/yql/sql/v1/sql_ut.cpp @@ -1967,6 +1967,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { VIRTUAL_TIMESTAMPS = FALSE, RESOLVED_TIMESTAMPS = Interval("PT1S"), RETENTION_PERIOD = Interval("P1D"), + TOPIC_MIN_ACTIVE_PARTITIONS = 10, AWS_REGION = 'aws:region' ) ); @@ -1986,6 +1987,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("false")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("resolved_timestamps")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("retention_period")); + UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("topic_min_active_partitions")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("aws_region")); UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("aws:region")); } @@ -3915,6 +3917,19 @@ select FormatType($f()); UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:99: Error: Literal of Interval type is expected for RETENTION_PERIOD\n"); } + Y_UNIT_TEST(InvalidChangefeedTopicPartitions) { + auto req = R"( + USE plato; + CREATE TABLE tableName ( + Key Uint32, PRIMARY KEY (Key), + CHANGEFEED feedName WITH (MODE = "KEYS_ONLY", FORMAT = "json", TOPIC_MIN_ACTIVE_PARTITIONS = "foo") + ); + )"; + auto res = SqlToYql(req); + UNIT_ASSERT(!res.Root); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:110: Error: Literal of integer type is expected for TOPIC_MIN_ACTIVE_PARTITIONS\n"); + } + Y_UNIT_TEST(InvalidChangefeedAwsRegion) { auto req = R"( USE plato; diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index d2e19d4d811..c6d0bb1b6c1 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -9,6 +9,7 @@ import "ydb/public/api/protos/ydb_query_stats.proto"; import "ydb/public/api/protos/ydb_value.proto"; import "ydb/public/api/protos/ydb_scheme.proto"; import "ydb/public/api/protos/ydb_status_codes.proto"; +import "ydb/public/api/protos/ydb_topic.proto"; import "ydb/public/api/protos/ydb_formats.proto"; import "google/protobuf/empty.proto"; @@ -165,6 +166,8 @@ message Changefeed { string aws_region = 8 [(length).le = 128]; // Periodically emit resolved timestamps. If unspecified, resolved timestamps are not emitted. google.protobuf.Duration resolved_timestamps_interval = 9; + // Partitioning settings of underlying topic. + Topic.PartitioningSettings topic_partitioning_settings = 10; } message ChangefeedDescription { |