summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <[email protected]>2022-06-16 11:05:45 +0300
committeralexnick <[email protected]>2022-06-16 11:05:45 +0300
commitfd94790c9e2d68cf1ec1ac1d0b286e1a67184db8 (patch)
treeae58f95db427c67edf4a91a5b3a514817ac1ff59
parent9a31a385c5e03d4b4269db196fbef002b1553a58 (diff)
alter with partial fields usage
ref:614b6696343378fc384a9465085d98e2f5651c75
-rw-r--r--ydb/public/api/protos/CMakeLists.txt1
-rw-r--r--ydb/public/api/protos/ydb_scheme.proto1
-rw-r--r--ydb/public/api/protos/ydb_topic.proto232
-rw-r--r--ydb/public/lib/validation/helpers.cpp8
4 files changed, 111 insertions, 131 deletions
diff --git a/ydb/public/api/protos/CMakeLists.txt b/ydb/public/api/protos/CMakeLists.txt
index a573bf200a4..0f093e35576 100644
--- a/ydb/public/api/protos/CMakeLists.txt
+++ b/ydb/public/api/protos/CMakeLists.txt
@@ -45,6 +45,7 @@ target_proto_messages(api-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_table.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_value.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_s3_internal.proto
+ ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_topic.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/yq.proto
)
generate_enum_serilization(api-protos
diff --git a/ydb/public/api/protos/ydb_scheme.proto b/ydb/public/api/protos/ydb_scheme.proto
index 4594effba62..7c61de6e7f6 100644
--- a/ydb/public/api/protos/ydb_scheme.proto
+++ b/ydb/public/api/protos/ydb_scheme.proto
@@ -58,6 +58,7 @@ message Entry {
COLUMN_TABLE = 13;
SEQUENCE = 15;
REPLICATION = 16;
+ TOPIC = 17;
}
// Name of scheme entry (dir2 of /dir1/dir2)
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index 1ea75514ef5..8d940a5d029 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -3,11 +3,12 @@ import "ydb/public/api/protos/ydb_operation.proto";
import "ydb/public/api/protos/ydb_scheme.proto";
import "ydb/public/api/protos/ydb_status_codes.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";
-import "ydb/public/api/protos/annotations/validation.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
+import "google/protobuf/wrappers.proto";
+
package Ydb.Topic;
option java_package = "com.yandex.ydb.topic";
@@ -99,8 +100,7 @@ message StreamWriteMessage {
int64 partition_id = 3;
// Client can only use compression codecs from this set to write messages to topic, session will be closed with BAD_REQUEST otherwise.
- // See enum Codec above for values.
- repeated int64 supported_codecs = 10;
+ SupportedCodecs supported_codecs = 4;
}
// Represents portion of client messages.
@@ -425,9 +425,11 @@ message StreamReadMessage {
// InitRequest.max_lag and InitRequest.read_from could lead to skip of more messages.
// Server will return data starting from offset that is maximum of actual committed offset, read_offset (if set)
// and offsets calculated from InitRequest.max_lag and InitRequest.read_from.
- optional int64 read_offset = 2;
+// optional int64 read_offset = 2;
+ google.protobuf.Int64Value read_offset = 2;
// All messages with offset less than commit_offset are processed by client. Server will commit this position if this is not done yet.
- optional int64 commit_offset = 3;
+ //optional int64 commit_offset = 3;
+ google.protobuf.Int64Value commit_offset = 3;
}
// Command from server to stop and destroy concrete partition session.
@@ -474,6 +476,55 @@ message DropTopicResponse {
message DropTopicResult {
}
+// Description of supported codecs.
+message SupportedCodecs {
+ // List of supported codecs.
+ // See enum Codec above for values.
+ repeated int32 codecs = 1;
+}
+
+// Consumer description.
+message Consumer {
+ // Must have valid not empty name as a key.
+ string name = 1;
+ // Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
+ // User should take care that such consumer never stalls, to prevent running out of disk space.
+ // Flag that this consumer is important.
+ bool important = 2;
+ // All messages with smaller server written_at timestamp will be skipped.
+ google.protobuf.Timestamp read_from = 3;
+ reserved 4; // supported_format
+ // List of supported codecs by this consumer.
+ // supported_codecs on topic must be contained inside this list.
+ SupportedCodecs supported_codecs = 5;
+
+ // Attributes of consumer
+ map<string, string> attributes = 6;
+}
+
+
+// Consumer alter description.
+message AlterConsumer {
+ // Must have valid not empty name as a key.
+ string name = 1;
+ // Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
+ // User should take care that such consumer never stalls, to prevent running out of disk space.
+ // Flag that this consumer is important.
+// optional bool important = 2;
+ google.protobuf.BoolValue important = 2;
+ // All messages with smaller server written_at timestamp will be skipped.
+ google.protobuf.Timestamp read_from = 3;
+ reserved 4; // supported_format
+ // List of supported codecs by this consumer.
+ // supported_codecs on topic must be contained inside this list.
+ SupportedCodecs supported_codecs = 5;
+
+ // User and server attributes of consumer. Server attributes starts from "_" and will be validated by server.
+ // Leave the value blank to drop an attribute.
+ map<string, string> alter_attributes = 6;
+}
+
+
// Message for describing topic internals.
message TopicSettings {
oneof partitions {
@@ -483,44 +534,24 @@ message TopicSettings {
}
// How long data in partition should be stored. Must be greater than 0 and less than limit for this database.
// Default limit - 36 hours.
- google.protobuf.Duration retention_period = 2;
- // How much data in partition should be stored. Must be greater than 0 and less than limit for this database.
- int64 retention_storage_mb = 3;
- reserved 4; // supported_format.
+ google.protobuf.Duration retention_period = 3;
+ // How much data in partition should be stored. Must be greater than 0 and less than limit for this database. Zero means infinite limit.
+ int64 retention_storage_mb = 4;
+ reserved 5; // supported_format.
// List of allowed codecs for writers.
- // See enum Codec above for values.
// Writes with codec not from this list are forbidden.
- repeated int64 supported_codecs = 5;
- // Partition write speed in bytes per second. Must be less than database limit. Default limit - 1 MB/s.
- int64 partition_write_speed_bytes_per_second = 6;
- // Burst size for write in partition, in bytes. Must be less than database limit. Default limit - 1 MB.
- int64 partition_write_burst_bytes = 7;
+ SupportedCodecs supported_codecs = 6;
+ // Partition write speed in bytes per second. Must be less than database limit. Default limit - 1 MB/s - used when value is zero.
+ int64 partition_write_speed_bytes_per_second = 7;
+ // Burst size for write in partition, in bytes. Must be less than database limit. Default limit - 1 MB - used when value is zero.
+ int64 partition_write_burst_bytes = 8;
// User and server attributes of topic. Server attributes starts from "_" and will be validated by server.
- map<string, string> attributes = 8;
+ map<string, string> attributes = 9;
// List of consumers for this topic.
- repeated Consumer consumers = 9;
-
- // Consumer description.
- message Consumer {
- // Must have valid not empty name as a key.
- string name = 1;
- // Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
- // User should take care that such consumer never stalls, to prevent running out of disk space.
- // Flag that this consumer is important.
- bool important = 2;
- // All messages with smaller server written_at timestamp will be skipped.
- google.protobuf.Timestamp read_from = 3;
- reserved 4; // supported_format
- // List of supported codecs by this consumer.
- // See enum Codec above for values.
- // supported_codecs on topic must be contained inside this list.
- repeated int64 supported_codecs = 5;
+ repeated Consumer consumers = 10;
- // Attributes of consumer
- map<string, string> attributes = 6;
- }
}
// Create topic request sent from client to server.
@@ -528,8 +559,9 @@ message CreateTopicRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Topic path.
string path = 2;
+
// Topic settings.
- TopicSettings settings = 4;
+ TopicSettings settings = 3;
}
@@ -560,45 +592,42 @@ message AlterTopicResponse {
// Update topic result message that will be inside UpdateTopicResponse.operation.
message AlterTopicResult {
-}
-
-// Add consumer for existing topic request.
-message AddConsumerRequest {
- Ydb.Operations.OperationParams operation_params = 1;
- // Topic path.
- string path = 2;
- // consumers to add
- TopicSettings.Consumer consumer = 3;
-}
-
-// Add consumer for existing topic response.
-message AddConsumerResponse {
- // Result of request will be inside operation.
- Ydb.Operations.Operation operation = 1;
-}
-
-// Add consumer result message that will be inside AddConsumerReponse.operation.
-message AddConsumerResult {
-}
+ oneof partitions {
+ // How many uniform partitions in topic. Must less than database limit. Default limit - 10.
+ int32 partitions_count = 1;
+ // 2 reserved for partition_at_keys
+ }
+ // How long data in partition should be stored. Must be greater than 0 and less than limit for this database.
+ // Default limit - 36 hours.
+ google.protobuf.Duration retention_period = 3;
+ // How much data in partition should be stored. Must be greater than 0 and less than limit for this database.
+// optional int64 retention_storage_mb = 4;
+ google.protobuf.Int64Value retention_storage_mb = 4;
+ reserved 5; // supported_format.
+ // List of allowed codecs for writers.
+ // See enum Codec above for values.
+ // Writes with codec not from this list are forbidden.
+ SupportedCodecs supported_codecs = 6;
-// Drop consumer request for existing topic.
-message DropConsumerRequest {
- Ydb.Operations.OperationParams operation_params = 1;
- // Topic path.
- string path = 2;
- // Name of consumer to drop.
- string consumer = 3;
-}
+ // Partition write speed in bytes per second. Must be less than database limit. Default limit - 1 MB/s.
+// optional int64 partition_write_speed_bytes_per_second = 7;
+ google.protobuf.Int64Value partition_write_speed_bytes_per_second = 7;
+ // Burst size for write in partition, in bytes. Must be less than database limit. Default limit - 1 MB.
+// optional int64 partition_write_burst_bytes = 8;
+ google.protobuf.Int64Value partition_write_burst_bytes = 8;
-// Drop consumer response for existing topic.
-message DropConsumerResponse {
- // Result of request will be inside operation.
- Ydb.Operations.Operation operation = 1;
+ // User and server attributes of topic. Server attributes starts from "_" and will be validated by server.
+ // Leave the value blank to drop an attribute.
+ map<string, string> alter_attributes = 9;
+
+ // Add consumers.
+ repeated Consumer add_consumers = 10;
+ // Remove consumers (by its names)
+ repeated string drop_consumers = 11;
+ // Alter consumers
+ repeated AlterConsumer alter_consumers = 12;
}
-// Drop consumer result message that will be inside DropConsumerReponse.operation.
-message DropConsumerResult {
-}
// Describe topic request sent from client to server.
message DescribeTopicRequest {
@@ -606,14 +635,6 @@ message DescribeTopicRequest {
// Topic path.
string path = 2;
-
- // If this message is present, response will include runtime topic statistics.
- IncludeStats include_stats = 3;
-
- message IncludeStats {
- // Consumer statistics for reading this topic may be included in response.
- string consumer = 1;
- }
}
// Describe topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR".
@@ -628,55 +649,4 @@ message DescribeTopicResult {
Ydb.Scheme.Entry self = 1;
// Settings of topic.
TopicSettings settings = 2;
-
- // Message containing information about concrete topic reading, if requested.
- TopicStats topic_stats = 3;
-
- message TopicStats {
- repeated PartitionStats partition_stats = 1;
-
- // Message containing information about concrete topic's partition reading.
- message PartitionStats {
- // Patition identifier inside topic.
- int64 partition_id = 1;
-
- // Partition contains messages with offsets in range [start, end).
- OffsetsRange partition_offsets = 2;
-
- // Host name of node where partition leader is running.
- string tablet_node = 3;
-
- // Statistics of particular consumer, if requested.
- ConsumerStats consumer_stats = 4;
-
- message ConsumerStats {
- // Offset of consumer committed message a.k.a. first not processed message.
- // If commit_offset == end_offset then all messages from partition are processed.
- int64 commit_offset = 2;
- // Consumer lag in time between committed and last messages in partition.
- int64 commit_time_lag_ms = 3;
-
- // Offset ranges client wants to commit, but server is waiting for commits of gaps.
- repeated OffsetsRange out_of_order_commit_offset_ranges = 4;
-
- // Offset of first not read message by consumer from this partition.
- // read_offset can be bigger that committed_offset - consumer could read some messages but not yet commit them.
- int64 read_offset = 5;
- // Consumer lag in time between read and last messages in partition.
- int64 read_time_lag_ms = 6;
-
- // Session identifier that locked and reading this partition right now.
- string session_id = 7;
- // Ip of node that created reading this session.
- string client_ip = 8;
- // Host name of proxy node that processing this reading session.
- string proxy_node = 9;
-
- // Assign identifier of actual partition assignment.
- int64 partition_session_id = 10;
- // Timestamp of partition session start.
- google.protobuf.Timestamp partition_session_started_at = 11;
- }
- }
- }
}
diff --git a/ydb/public/lib/validation/helpers.cpp b/ydb/public/lib/validation/helpers.cpp
index 0b30fdcec79..3d3c6b4b416 100644
--- a/ydb/public/lib/validation/helpers.cpp
+++ b/ydb/public/lib/validation/helpers.cpp
@@ -5,6 +5,7 @@
#include <google/protobuf/duration.pb.h>
#include <google/protobuf/empty.pb.h>
#include <google/protobuf/timestamp.pb.h>
+#include <google/protobuf/wrappers.pb.h>
#include <util/string/builder.h>
#include <util/string/subst.h>
@@ -55,6 +56,13 @@ bool IsCustomMessage(const google::protobuf::Descriptor* message) {
if (message->full_name() == google::protobuf::Timestamp::descriptor()->full_name()) {
return false;
}
+ if (message->full_name() == google::protobuf::Int64Value::descriptor()->full_name()) {
+ return false;
+ }
+ if (message->full_name() == google::protobuf::BoolValue::descriptor()->full_name()) {
+ return false;
+ }
+
if (message->options().map_entry()) {
return false;
}