aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-10-13 12:51:48 +0300
committerilnaz <ilnaz@ydb.tech>2023-10-13 13:40:59 +0300
commit9d5156a9810bb20f3c0406268a8beb2701a114c3 (patch)
treeaab9009126051aa4c864be6b3a320ecda2cd12d3
parentd8f0ea2538f0f50147d3d4c5fcbe0297b504ab12 (diff)
downloadydb-9d5156a9810bb20f3c0406268a8beb2701a114c3.tar.gz
Explicit number of changefeed's underlying topic partitions KIKIMR-19460
-rw-r--r--ydb/core/grpc_services/rpc_alter_table.cpp13
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp13
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp109
-rw-r--r--ydb/core/protos/flat_scheme_op.proto1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp67
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp58
-rw-r--r--ydb/library/yql/sql/v1/format/sql_format_ut.cpp2
-rw-r--r--ydb/library/yql/sql/v1/node.h1
-rw-r--r--ydb/library/yql/sql/v1/query.cpp3
-rw-r--r--ydb/library/yql/sql/v1/sql_expression.cpp6
-rw-r--r--ydb/library/yql/sql/v1/sql_ut.cpp15
-rw-r--r--ydb/public/api/protos/ydb_table.proto3
12 files changed, 269 insertions, 22 deletions
diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp
index e82991093d1..fe01512b5aa 100644
--- a/ydb/core/grpc_services/rpc_alter_table.cpp
+++ b/ydb/core/grpc_services/rpc_alter_table.cpp
@@ -498,10 +498,23 @@ private:
for (const auto& add : req->add_changefeeds()) {
auto op = modifyScheme->MutableCreateCdcStream();
op->SetTableName(name);
+
if (add.has_retention_period()) {
op->SetRetentionPeriodSeconds(add.retention_period().seconds());
}
+ if (add.has_topic_partitioning_settings()) {
+ i64 minActivePartitions = add.topic_partitioning_settings().min_active_partitions();
+ if (minActivePartitions < 0) {
+ NYql::TIssues issues;
+ issues.AddIssue(NYql::TIssue("Topic partitions count must be positive"));
+ return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
+ } else if (minActivePartitions == 0) {
+ minActivePartitions = 1;
+ }
+ op->SetTopicPartitions(minActivePartitions);
+ }
+
StatusIds::StatusCode code;
TString error;
if (!FillChangefeedDescription(*op->MutableStreamDescription(), add, code, error)) {
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 81c21614d4a..4242f27a146 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -1442,6 +1442,19 @@ public:
const auto duration = TDuration::FromValue(value);
auto& retention = *add_changefeed->mutable_retention_period();
retention.set_seconds(duration.Seconds());
+ } else if (name == "topic_min_active_partitions") {
+ auto value = TString(
+ setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
+ );
+
+ i64 minActivePartitions;
+ if (!TryFromString(value, minActivePartitions) || minActivePartitions <= 0) {
+ ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
+ TStringBuilder() << name << " must be greater than 0"));
+ return SyncError();
+ }
+
+ add_changefeed->mutable_topic_partitioning_settings()->set_min_active_partitions(minActivePartitions);
} else if (name == "aws_region") {
auto value = TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index 74af347cdc2..3fe92a66fb1 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -3808,6 +3808,115 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}
}
+ Y_UNIT_TEST(ChangefeedTopicPartitions) {
+ using namespace NTopic;
+
+ TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
+ auto pq = TTopicClient(kikimr.GetDriver(), TTopicClientSettings().Database("/Root"));
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ { // Uint64 key
+ auto query = R"(
+ --!syntax_v1
+ CREATE TABLE `/Root/table_1` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )";
+
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // default
+ auto query = R"(
+ --!syntax_v1
+ ALTER TABLE `/Root/table_1` ADD CHANGEFEED `feed_1` WITH (
+ MODE = 'KEYS_ONLY', FORMAT = 'JSON'
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto desc = pq.DescribeTopic("/Root/table_1/feed_1").ExtractValueSync();
+ UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitions().size(), 1);
+ }
+
+ { // custom
+ auto query = R"(
+ --!syntax_v1
+ ALTER TABLE `/Root/table_1` ADD CHANGEFEED `feed_2` WITH (
+ MODE = 'KEYS_ONLY', FORMAT = 'JSON', TOPIC_MIN_ACTIVE_PARTITIONS = 10
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto desc = pq.DescribeTopic("/Root/table_1/feed_2").ExtractValueSync();
+ UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitions().size(), 10);
+ }
+
+ { // non-positive (invalid)
+ auto query = R"(
+ --!syntax_v1
+ ALTER TABLE `/Root/table_1` ADD CHANGEFEED `feed_3` WITH (
+ MODE = 'KEYS_ONLY', FORMAT = 'JSON', TOPIC_MIN_ACTIVE_PARTITIONS = 0
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
+ }
+
+ { // Utf8 key
+ auto query = R"(
+ --!syntax_v1
+ CREATE TABLE `/Root/table_2` (
+ Key Utf8,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )";
+
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // default
+ auto query = R"(
+ --!syntax_v1
+ ALTER TABLE `/Root/table_2` ADD CHANGEFEED `feed_1` WITH (
+ MODE = 'KEYS_ONLY', FORMAT = 'JSON'
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto desc = pq.DescribeTopic("/Root/table_2/feed_1").ExtractValueSync();
+ UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitions().size(), 1);
+ }
+
+ { // custom
+ auto query = R"(
+ --!syntax_v1
+ ALTER TABLE `/Root/table_2` ADD CHANGEFEED `feed_2` WITH (
+ MODE = 'KEYS_ONLY', FORMAT = 'JSON', TOPIC_MIN_ACTIVE_PARTITIONS = 10
+ );
+ )";
+
+ const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
+ }
+ }
+
Y_UNIT_TEST(ChangefeedAttributes) {
TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
auto db = kikimr.GetTableClient();
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index f6ece2ed23c..8d02114ab4d 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -761,6 +761,7 @@ message TCreateCdcStream {
optional string TableName = 1;
optional TCdcStreamDescription StreamDescription = 2;
optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default
+ optional uint32 TopicPartitions = 4;
}
message TAlterCdcStream {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 9dc0d4b9672..96bf95347cd 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -723,6 +723,37 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
<< "Initial scan is not supported yet")};
}
+ Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId));
+ auto table = context.SS->Tables.at(tablePath.Base()->PathId);
+
+ TVector<TString> boundaries;
+ if (op.HasTopicPartitions()) {
+ if (op.GetTopicPartitions() <= 0) {
+ return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, "Topic partitions count must be greater than 0")};
+ }
+
+ const auto& keyColumns = table->KeyColumnIds;
+ const auto& columns = table->Columns;
+
+ Y_ABORT_UNLESS(!keyColumns.empty());
+ Y_ABORT_UNLESS(columns.contains(keyColumns.at(0)));
+ const auto firstKeyColumnType = columns.at(keyColumns.at(0)).PType;
+
+ if (!TSchemeShard::FillUniformPartitioning(boundaries, keyColumns.size(), firstKeyColumnType, op.GetTopicPartitions(), AppData()->TypeRegistry, errStr)) {
+ return {CreateReject(opId, NKikimrScheme::StatusInvalidParameter, errStr)};
+ }
+ } else {
+ const auto& partitions = table->GetPartitions();
+ boundaries.reserve(partitions.size() - 1);
+
+ for (ui32 i = 0; i < partitions.size(); ++i) {
+ const auto& partition = partitions.at(i);
+ if (i != partitions.size() - 1) {
+ boundaries.push_back(partition.EndOfRange);
+ }
+ }
+ }
+
TVector<ISubOperation::TPtr> result;
if (initialScan) {
@@ -759,16 +790,12 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
}
{
- Y_ABORT_UNLESS(context.SS->Tables.contains(tablePath.Base()->PathId));
- auto table = context.SS->Tables.at(tablePath.Base()->PathId);
- const auto& partitions = table->GetPartitions();
-
auto outTx = TransactionTemplate(streamPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup);
outTx.SetFailOnExist(!acceptExisted);
auto& desc = *outTx.MutableCreatePersQueueGroup();
desc.SetName("streamImpl");
- desc.SetTotalGroupCount(partitions.size());
+ desc.SetTotalGroupCount(op.HasTopicPartitions() ? op.GetTopicPartitions() : table->GetPartitions().size());
desc.SetPartitionPerTablet(2);
auto& pqConfig = *desc.MutablePQTabletConfig();
@@ -795,23 +822,19 @@ TVector<ISubOperation::TPtr> CreateNewCdcStream(TOperationId opId, const TTxTran
}
}
- for (ui32 i = 0; i < partitions.size(); ++i) {
- const auto& cur = partitions.at(i);
-
- if (i != partitions.size() - 1) {
- TSerializedCellVec endKey(cur.EndOfRange);
- Y_ABORT_UNLESS(endKey.GetCells().size() <= table->KeyColumnIds.size());
-
- TString errStr;
- auto& boundary = *desc.AddPartitionBoundaries();
- for (ui32 ki = 0; ki < endKey.GetCells().size(); ++ki) {
- const auto& cell = endKey.GetCells()[ki];
- const auto tag = table->KeyColumnIds.at(ki);
- Y_ABORT_UNLESS(table->Columns.contains(tag));
- const auto typeId = table->Columns.at(tag).PType;
- const bool ok = NMiniKQL::CellToValue(typeId, cell, *boundary.AddTuple(), errStr);
- Y_ABORT_UNLESS(ok, "Failed to build key tuple at position %" PRIu32 " error: %s", ki, errStr.data());
- }
+ for (const auto& serialized : boundaries) {
+ TSerializedCellVec endKey(serialized);
+ Y_ABORT_UNLESS(endKey.GetCells().size() <= table->KeyColumnIds.size());
+
+ TString errStr;
+ auto& boundary = *desc.AddPartitionBoundaries();
+ for (ui32 ki = 0; ki < endKey.GetCells().size(); ++ki) {
+ const auto& cell = endKey.GetCells()[ki];
+ const auto tag = table->KeyColumnIds.at(ki);
+ Y_ABORT_UNLESS(table->Columns.contains(tag));
+ const auto typeId = table->Columns.at(tag).PType;
+ const bool ok = NMiniKQL::CellToValue(typeId, cell, *boundary.AddTuple(), errStr);
+ Y_ABORT_UNLESS(ok, "Failed to build key tuple at position %" PRIu32 " error: %s", ki, errStr.data());
}
}
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
index 3cd979308ef..97385b36f82 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
@@ -172,6 +172,64 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
}
+ Y_UNIT_TEST(TopicPartitions) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
+ ui64 txId = 100;
+
+ for (const auto& keyType : TVector<TString>{"Uint64", "Uint32", "Utf8"}) {
+ const auto status = keyType != "Utf8"
+ ? NKikimrScheme::StatusAccepted
+ : NKikimrScheme::StatusInvalidParameter;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ Name: "Table%s"
+ Columns { Name: "key" Type: "%s" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )", keyType.c_str(), keyType.c_str()));
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table%s"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ TopicPartitions: 10
+ )", keyType.c_str()), {status});
+
+ if (status != NKikimrScheme::StatusAccepted) {
+ continue;
+ }
+
+ env.TestWaitNotification(runtime, txId);
+ TestDescribeResult(DescribePrivatePath(runtime, Sprintf("/MyRoot/Table%s/Stream/streamImpl", keyType.c_str())), {
+ NLs::PathExist,
+ NLs::CheckPartCount("streamImpl", 10, 2, 5, 10),
+ });
+ }
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ }
+ TopicPartitions: 0
+ )", {NKikimrScheme::StatusInvalidParameter});
+ }
+
Y_UNIT_TEST(Attributes) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
diff --git a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
index 14c7693c04a..fbb205bb16b 100644
--- a/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
+++ b/ydb/library/yql/sql/v1/format/sql_format_ut.cpp
@@ -441,6 +441,8 @@ Y_UNIT_TEST_SUITE(CheckSqlFormatter) {
"ALTER TABLE user\n\tADD CHANGEFEED user WITH (virtual_timestamps = FALSE);\n\n"},
{"alter table user add changefeed user with (resolved_timestamps = Interval(\"PT1S\"))",
"ALTER TABLE user\n\tADD CHANGEFEED user WITH (resolved_timestamps = Interval(\"PT1S\"));\n\n"},
+ {"alter table user add changefeed user with (topic_min_active_partitions = 1)",
+ "ALTER TABLE user\n\tADD CHANGEFEED user WITH (topic_min_active_partitions = 1);\n\n"},
};
TSetup setup;
diff --git a/ydb/library/yql/sql/v1/node.h b/ydb/library/yql/sql/v1/node.h
index 7dd8590d621..0c2b6bd141e 100644
--- a/ydb/library/yql/sql/v1/node.h
+++ b/ydb/library/yql/sql/v1/node.h
@@ -1023,6 +1023,7 @@ namespace NSQLTranslationV1 {
TNodePtr VirtualTimestamps;
TNodePtr ResolvedTimestamps;
TNodePtr RetentionPeriod;
+ TNodePtr TopicPartitions;
TNodePtr AwsRegion;
std::optional<std::variant<TLocalSinkSettings>> SinkSettings;
};
diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp
index c37489868b2..feedcf290ce 100644
--- a/ydb/library/yql/sql/v1/query.cpp
+++ b/ydb/library/yql/sql/v1/query.cpp
@@ -187,6 +187,9 @@ static INode::TPtr CreateChangefeedDesc(const TChangefeedDescription& desc, cons
if (desc.Settings.RetentionPeriod) {
settings = node.L(settings, node.Q(node.Y(node.Q("retention_period"), desc.Settings.RetentionPeriod)));
}
+ if (desc.Settings.TopicPartitions) {
+ settings = node.L(settings, node.Q(node.Y(node.Q("topic_min_active_partitions"), desc.Settings.TopicPartitions)));
+ }
if (desc.Settings.AwsRegion) {
settings = node.L(settings, node.Q(node.Y(node.Q("aws_region"), desc.Settings.AwsRegion)));
}
diff --git a/ydb/library/yql/sql/v1/sql_expression.cpp b/ydb/library/yql/sql/v1/sql_expression.cpp
index b44df7cd9b9..d2fa2c205ce 100644
--- a/ydb/library/yql/sql/v1/sql_expression.cpp
+++ b/ydb/library/yql/sql/v1/sql_expression.cpp
@@ -169,6 +169,12 @@ bool ChangefeedSettingsEntry(const TRule_changefeed_settings_entry& node, TSqlEx
return false;
}
settings.RetentionPeriod = exprNode;
+ } else if (to_lower(id.Name) == "topic_min_active_partitions") {
+ if (!exprNode->IsIntegerLiteral()) {
+ ctx.Context().Error() << "Literal of integer type is expected for " << id.Name;
+ return false;
+ }
+ settings.TopicPartitions = exprNode;
} else if (to_lower(id.Name) == "aws_region") {
if (!exprNode->IsLiteral() || exprNode->GetLiteralType() != "String") {
ctx.Context().Error() << "Literal of String type is expected for " << id.Name;
diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp
index a659c501d1e..22da243ec8f 100644
--- a/ydb/library/yql/sql/v1/sql_ut.cpp
+++ b/ydb/library/yql/sql/v1/sql_ut.cpp
@@ -1967,6 +1967,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
VIRTUAL_TIMESTAMPS = FALSE,
RESOLVED_TIMESTAMPS = Interval("PT1S"),
RETENTION_PERIOD = Interval("P1D"),
+ TOPIC_MIN_ACTIVE_PARTITIONS = 10,
AWS_REGION = 'aws:region'
)
);
@@ -1986,6 +1987,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("false"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("resolved_timestamps"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("retention_period"));
+ UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("topic_min_active_partitions"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("aws_region"));
UNIT_ASSERT_VALUES_UNEQUAL(TString::npos, line.find("aws:region"));
}
@@ -3915,6 +3917,19 @@ select FormatType($f());
UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:99: Error: Literal of Interval type is expected for RETENTION_PERIOD\n");
}
+ Y_UNIT_TEST(InvalidChangefeedTopicPartitions) {
+ auto req = R"(
+ USE plato;
+ CREATE TABLE tableName (
+ Key Uint32, PRIMARY KEY (Key),
+ CHANGEFEED feedName WITH (MODE = "KEYS_ONLY", FORMAT = "json", TOPIC_MIN_ACTIVE_PARTITIONS = "foo")
+ );
+ )";
+ auto res = SqlToYql(req);
+ UNIT_ASSERT(!res.Root);
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:5:110: Error: Literal of integer type is expected for TOPIC_MIN_ACTIVE_PARTITIONS\n");
+ }
+
Y_UNIT_TEST(InvalidChangefeedAwsRegion) {
auto req = R"(
USE plato;
diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto
index d2e19d4d811..c6d0bb1b6c1 100644
--- a/ydb/public/api/protos/ydb_table.proto
+++ b/ydb/public/api/protos/ydb_table.proto
@@ -9,6 +9,7 @@ import "ydb/public/api/protos/ydb_query_stats.proto";
import "ydb/public/api/protos/ydb_value.proto";
import "ydb/public/api/protos/ydb_scheme.proto";
import "ydb/public/api/protos/ydb_status_codes.proto";
+import "ydb/public/api/protos/ydb_topic.proto";
import "ydb/public/api/protos/ydb_formats.proto";
import "google/protobuf/empty.proto";
@@ -165,6 +166,8 @@ message Changefeed {
string aws_region = 8 [(length).le = 128];
// Periodically emit resolved timestamps. If unspecified, resolved timestamps are not emitted.
google.protobuf.Duration resolved_timestamps_interval = 9;
+ // Partitioning settings of underlying topic.
+ Topic.PartitioningSettings topic_partitioning_settings = 10;
}
message ChangefeedDescription {