aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ildar-khisam@yandex-team.ru>2022-06-15 22:18:09 +0300
committerildar-khisam <ildar-khisam@yandex-team.ru>2022-06-15 22:18:09 +0300
commit816bd0a9d59c9bdffcc2ca7fc652b016486eac2d (patch)
treec7c7651a38cf9e750a9b7107dea5c478af92ad7f
parent68d10ae003731c0e5e41cb6bb0f06524c2db02c6 (diff)
downloadydb-816bd0a9d59c9bdffcc2ca7fc652b016486eac2d.tar.gz
drafts for public grpc api final revision
fix some more issues fix more issues ref:ea53351742bcd9d93d43d7e895bf1eca9a2b412e
-rw-r--r--ydb/public/api/grpc/draft/ydb_topic_v1.proto8
-rw-r--r--ydb/public/api/protos/ydb_topic.proto548
2 files changed, 280 insertions, 276 deletions
diff --git a/ydb/public/api/grpc/draft/ydb_topic_v1.proto b/ydb/public/api/grpc/draft/ydb_topic_v1.proto
index d3bb31046a..23219ef6ef 100644
--- a/ydb/public/api/grpc/draft/ydb_topic_v1.proto
+++ b/ydb/public/api/grpc/draft/ydb_topic_v1.proto
@@ -27,7 +27,7 @@ service TopicService {
// <----------------
// [something went wrong] (status != SUCCESS, issues not empty)
// <----------------
- rpc StreamWrite(stream StreamWriteClientMessage) returns (stream StreamWriteServerMessage);
+ rpc StreamWrite(stream StreamWriteMessage.FromClient) returns (stream StreamWriteMessage.FromServer);
// Create Read Session
@@ -61,7 +61,7 @@ service TopicService {
// <----------------
// [something went wrong] (status != SUCCESS, issues not empty)
// <----------------
- rpc StreamRead(stream StreamReadClientMessage) returns (stream StreamReadServerMessage);
+ rpc StreamRead(stream StreamReadMessage.FromClient) returns (stream StreamReadMessage.FromServer);
// Describe topic command.
@@ -84,6 +84,6 @@ service TopicService {
rpc AddConsumer(AddConsumerRequest) returns (AddConsumerResponse);
- // Remove consumer command.
- rpc RemoveConsumer(RemoveConsumerRequest) returns (RemoveConsumerResponse);
+ // Drop consumer command.
+ rpc DropConsumer(RemoveConsumerRequest) returns (RemoveConsumerResponse);
}
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index bb73c62931..1ea75514ef 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -5,6 +5,7 @@ import "ydb/public/api/protos/ydb_status_codes.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";
import "ydb/public/api/protos/annotations/validation.proto";
+import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
package Ydb.Topic;
@@ -19,6 +20,7 @@ enum Codec {
CODEC_GZIP = 2;
CODEC_LZOP = 3;
CODEC_ZSTD = 4;
+ CODEC_CUSTOM = 10000;
}
// Represents range [start, end).
@@ -37,23 +39,42 @@ message UpdateTokenRequest {
message UpdateTokenResponse {
}
-// Request for write session. Contains one of:
-// InitRequest - handshake request.
-// WriteRequest - portion of data to be written.
-// UpdateTokenRequest - user credentials if update is needed.
-message StreamWriteClientMessage {
- oneof client_message {
- InitRequest init_request = 1;
- WriteRequest write_request = 2;
- UpdateTokenRequest update_token_request = 3;
+// Messages for bidirectional streaming rpc StreamWrite
+message StreamWriteMessage {
+ // Client-server message for write session. Contains one of:
+ // InitRequest - handshake request.
+ // WriteRequest - portion of data to be written.
+ // UpdateTokenRequest - user credentials if update is needed.
+ message FromClient {
+ oneof client_message {
+ InitRequest init_request = 1;
+ WriteRequest write_request = 2;
+ UpdateTokenRequest update_token_request = 3;
+ }
+ }
+
+ // Server-client message for write session. Contains either non-success status, or one of:
+ // InitResponse - correct handshake response.
+ // WriteResponse - acknowledgment of storing client messages.
+ // UpdateTokenResponse - acknowledgment of reauthentication and reauthorization.
+ message FromServer {
+ // Server status of response.
+ Ydb.StatusIds.StatusCode status = 1;
+
+ // Issues if any.
+ repeated Ydb.Issue.IssueMessage issues = 2;
+
+ oneof server_message {
+ InitResponse init_response = 3;
+ WriteResponse write_response = 4;
+ UpdateTokenResponse update_token_response = 5;
+ }
}
// Handshake request that must be sent to server first.
message InitRequest {
// Full path of topic to write to.
- // e.g. /database_path/dir/my-topic
- // or /database_path/dir/my-topic.federation-mirrors/sas
- string topic = 1;
+ string path = 1;
// Message group identifier of client data stream.
string message_group_id = 2;
// User metadata attached to this write session.
@@ -67,6 +88,21 @@ message StreamWriteClientMessage {
}
}
+ // Response for handshake.
+ message InitResponse {
+ // Last persisted message's sequence number for this message group.
+ // Zero for new message group.
+ int64 last_seq_no = 1;
+ // Unique identifier of write session. Used for debug purposes.
+ string session_id = 2;
+ // Identifier of partition that is matched for this write session.
+ int64 partition_id = 3;
+
+ // Client can only use compression codecs from this set to write messages to topic, session will be closed with BAD_REQUEST otherwise.
+ // See enum Codec above for values.
+ repeated int64 supported_codecs = 10;
+ }
+
// Represents portion of client messages.
message WriteRequest {
repeated MessageData messages = 1;
@@ -87,39 +123,6 @@ message StreamWriteClientMessage {
int64 uncompressed_size = 4;
}
}
-}
-
-// Response for write session. Contains either non-success status, or one of:
-// InitResponse - correct handshake response.
-// WriteResponse - acknowledgment of storing client messages.
-// UpdateTokenResponse - acknowledgment of reauthentication and reauthorization.
-message StreamWriteServerMessage {
- // Server status of response.
- Ydb.StatusIds.StatusCode status = 1;
-
- // Issues if any.
- repeated Ydb.Issue.IssueMessage issues = 2;
-
- oneof server_message {
- InitResponse init_response = 3;
- WriteResponse write_response = 4;
- UpdateTokenResponse update_token_response = 5;
- }
-
- // Response for handshake.
- message InitResponse {
- // Last persisted message's sequence number for this message group.
- // Zero for new message group.
- int64 last_seq_no = 1;
- // Unique identifier of write session. Used for debug purposes.
- string session_id = 2;
- // Identifier of partition that is matched for this write session.
- int64 partition_id = 3;
-
- // Client can only use compression codecs from this set to write messages to topic, session will be closed with BAD_REQUEST otherwise.
- // See enum Codec above for values.
- repeated int64 supported_codecs = 10;
- }
// Message that represents acknowledgment for sequence of client messages.
// This sequence is persisted together so write statistics is for messages batch.
@@ -164,37 +167,86 @@ message StreamWriteServerMessage {
// Message with write statistics.
message WriteStatistics {
// Time spent in persisting of data. Same for each message in response.
- google.protobuf.Duration persist = 1;
+ google.protobuf.Duration persisting_time = 1;
// Time spent in queue before persisting, minimal of all messages in response.
- google.protobuf.Duration queued_in_partition_min = 2;
+ google.protobuf.Duration min_queue_wait_time = 2;
// Time spent in queue before persisting, maximal of all messages in response.
- google.protobuf.Duration queued_in_partition_max = 3;
+ google.protobuf.Duration max_queue_wait_time = 3;
// Time spent awaiting for partition write quota. Same for each message in response.
- google.protobuf.Duration throttled_on_partition = 4;
+ google.protobuf.Duration partition_quota_wait_time = 4;
// Time spent awaiting for topic write quota. Same for each message in response.
- google.protobuf.Duration throttled_on_topic = 5;
+ google.protobuf.Duration topic_quota_wait_time = 5;
}
}
}
+// Within a StreamRead session delivered messages are separated by partition.
+// Reads from a single partition are represented by a partition session.
+message PartitionSession {
+ // Identitifier of partition session. Unique inside one RPC call.
+ int64 partition_session_id = 1;
+ // Topic path of partition.
+ string path = 2;
+ // Partition identifier.
+ int64 partition_id = 3;
+}
-// Request for read session. Contains one of:
-// InitRequest - handshake request.
-// ReadRequest - request for data.
-// CommitOffsetRequest - request for commit of some read data.
-// StartPartitionSessionResponse - signal for server that client is ready to get data from partition.
-// StopPartitionSessionResponse - signal for server that client finished working with partition. Must be sent only after corresponding request from server.
-// PartitionSessionStatusRequest - request for session status
-// UpdateTokenRequest - request to update auth token
-message StreamReadClientMessage {
- oneof client_message {
- InitRequest init_request = 1;
- ReadRequest read_request = 2;
- StartPartitionSessionResponse start_partition_session_response = 3;
- StopPartitionSessionResponse stop_partition_session_response = 4;
- CommitOffsetRequest commit_offset_request = 5;
- PartitionSessionStatusRequest partition_session_status_request = 6;
- UpdateTokenRequest update_token_request = 7;
+// Messages for bidirectional streaming rpc StreamingRead
+message StreamReadMessage {
+ // Client-server message for read session. Contains one of:
+ // InitRequest - handshake request.
+ // ReadRequest - request for data.
+ // CommitOffsetRequest - request for commit of some read data.
+ // PartitionSessionStatusRequest - request for session status
+ // UpdateTokenRequest - request to update auth token
+ //
+ // StartPartitionSessionResponse - Response to StreamReadServerMessage.StartPartitionSessionRequest.
+ // Client signals it is ready to get data from partition.
+ // StopPartitionSessionResponse - Response to StreamReadServerMessage.StopPartitionSessionRequest.
+ // Client signals it has finished working with partition. Mandatory for graceful stop, optional otherwise.
+ message FromClient {
+ oneof client_message {
+ // Client requests.
+ InitRequest init_request = 1;
+ ReadRequest read_request = 2;
+ CommitOffsetRequest commit_offset_request = 3;
+ PartitionSessionStatusRequest partition_session_status_request = 4;
+ UpdateTokenRequest update_token_request = 5;
+
+ // Responses to respective server commands.
+ StartPartitionSessionResponse start_partition_session_response = 6;
+ StopPartitionSessionResponse stop_partition_session_response = 7;
+ }
+ }
+
+ // Server-client message for read session. Contains one of:
+ // InitResponse - handshake response from server.
+ // ReadResponse - portion of data.
+ // CommitOffsetResponse - acknowledgment for commit.
+ // PartitionSessionStatusResponse - server response with partition session status.
+ // UpdateTokenResponse - acknowledgment of token update.
+ //
+ // StartPartitionSessionRequest - command from server to create a partition session.
+ // StopPartitionSessionRequest - command from server to destroy a partition session.
+ message FromServer {
+ // Server status of response.
+ Ydb.StatusIds.StatusCode status = 1;
+
+ // Issues if any.
+ repeated Ydb.Issue.IssueMessage issues = 2;
+
+ oneof server_message {
+ // Responses to respective client requests.
+ InitResponse init_response = 3;
+ ReadResponse read_response = 4;
+ CommitOffsetResponse commit_offset_response = 5;
+ PartitionSessionStatusResponse partition_session_status_response = 6;
+ UpdateTokenResponse update_token_response = 7;
+
+ // Server commands.
+ StartPartitionSessionRequest start_partition_session_request = 8;
+ StopPartitionSessionRequest stop_partition_session_request = 9;
+ }
}
// Handshake request.
@@ -215,150 +267,33 @@ message StreamReadClientMessage {
// Zero means infinite lag.
google.protobuf.Duration max_lag = 3;
// Read data only after this timestamp from this topic.
- google.protobuf.Timestamp start_from_written_at = 4;
- }
- }
-
- // Message that represents client readiness for receiving more data.
- message ReadRequest {
- // Server and client each keep total bytes size of all ReadResponses.
- // When client is ready to receive 'bytes_size' more bytes in responses, it signals it via this ReadRequest.
- // client is ready to receive ReadResponses with total size up to bytes_size.
- // Except the case when the first message in response exceeds it - then it will be delivered.
- // The case when the first message in response exceeds available bytes_size sum, is an exception. Such message will be still delivered.
- // Available sum become negative in this case
- int64 bytes_size = 1;
- }
-
- // Signal for server that cient is ready to recive data for partition.
- message StartPartitionSessionResponse {
- // Partition session identifier of partition to start read.
- int64 partition_session_id = 1;
-
- // Reads in this partition session will start from offset no less than read_offset.
- // If read_offset is set, server will check that read_offset is no less that actual committed offset.
- // If check will fail then server will send a message describing error in status and issues fields, then close stream.
- //
- // If read_offset is not set, no check will be made.
- // InitRequest.max_lag and InitRequest.start_from_written_at could lead to skip of more messages.
- // Server will return data starting from offset that is maximum of actual committed offset, read_offset (if set)
- // and offsets calculated from InitRequest.max_lag and InitRequest.start_from_written_at.
- optional int64 read_offset = 2;
- // All messages with offset less than commit_offset are processed by client. Server will commit this position if this is not done yet.
- optional int64 commit_offset = 3;
- }
-
- // Signal for server that client finished working with this partition. Must be sent only after corresponding Release request from server.
- // Server will give this partition to other read session only after Released signal.
- message StopPartitionSessionResponse {
- // Partition session identifier of partition session that is released by client.
- int64 partition_session_id = 1;
- }
-
- // Signal for server that client processed some read data.
- message CommitOffsetRequest {
- // Partition offsets that indicates processed data.
- repeated PartitionCommitOffset commit_offsets = 1;
-
- // Message that is used for describing commit.
- message PartitionCommitOffset {
- // Identifier of partition session with data to commit.
- int64 partition_session_id = 1;
- // Processed offsets ranges, repeated in case of disjoint ranges.
- repeated OffsetsRange offsets = 2;
+ google.protobuf.Timestamp read_from = 4;
}
}
- message PartitionSessionStatusRequest {
- int64 partition_session_id = 1;
- }
-}
-
-// Within a StreamRead session delivered messages are separated by partition.
-// Reads from a single partition are represented by a partition session.
-message PartitionSession {
- // Identitifier of partition session. Unique inside one RPC call.
- int64 partition_session_id = 1;
- // Topic path of partition.
- string topic = 2;
- // Partition identifier.
- int64 partition_id = 3;
-}
-
-// Response for read session. Contains one of:
-// InitResponse - handshake response from server.
-// ReadResponse - portion of data.
-// StartPartitionSessionRequest - command from server to create a partition session.
-// StopPartitionSessionRequest - command from server to destroy a partition session.
-// CommitOffsetResponse - acknowledgment for commit.
-// PartitionSessionStatusResponse - server response with partition session status.
-// UpdateTokenResponse - acknowledgment of token update.
-message StreamReadServerMessage {
- // Server status of response.
- Ydb.StatusIds.StatusCode status = 1;
-
- // Issues if any.
- repeated Ydb.Issue.IssueMessage issues = 2;
-
- oneof server_message {
- InitResponse init_response = 3;
- ReadResponse read_response = 4;
- StartPartitionSessionRequest start_partition_session_request = 5;
- StopPartitionSessionRequest stop_partition_session_request = 6;
- CommitOffsetResponse commit_offset_response = 7;
- PartitionSessionStatusResponse partition_session_status_response = 8;
- UpdateTokenResponse update_token_response = 9;
- }
-
// Handshake response.
message InitResponse {
// Read session identifier for debug purposes.
string session_id = 1;
}
- // Command to create and start a partition session.
- // Client must react on this signal by sending StartPartitionSessionResponse when ready to recieve data from this partition.
- message StartPartitionSessionRequest {
- // Partition session description.
- PartitionSession partition_session = 1;
-
- // Actual upper bound for all committed offsets.
- // I.e. each offset up to and including (committed_offset - 1) was committed.
- int64 committed_offset = 2;
-
- // First and next-after-last offsets of messages in partition.
- OffsetsRange partition_offsets = 3;
- }
-
- // Command to stop and destroy concrete partition session.
- message StopPartitionSessionRequest {
- // Identifier of partition session that is ready to be closed by server.
- int64 partition_session_id = 1;
-
- // Flag of graceful stop.
- // If True then server is waiting for response from client before giving of this partition for other read session.
- // Server will not send more data from this partition.
- // Client can process all received data and wait for commit and only after send response.
- // If False then server gives partition for other session right now.
- // All further commits for this PartitionSession has no effect. Server is not waiting for response.
- bool graceful = 2;
-
- // Upper bound for committed offsets.
- int64 committed_offset = 3;
- }
-
- // Acknowledgement for commits.
- message CommitOffsetResponse {
- // Partitions with progress.
- repeated PartitionCommittedOffset partitions_committed_offsets = 1;
-
- // Per-partition commit representation.
- message PartitionCommittedOffset {
- // Partition session identifier.
- int64 partition_session_id = 1;
- // Upper bound for committed offsets.
- int64 committed_offset = 2;
- }
+ // Message that represents client readiness for receiving more data.
+ message ReadRequest {
+ // Server and client each keep track of total bytes size of all ReadResponses.
+ // When client is ready to receive N more bytes in responses (to increment possible total by N), it sends a ReadRequest with bytes_size = N.
+ //
+ // Example:
+ // 1) Let client have 200 B buffer. It sends ReadRequest with bytes_size = 200;
+ // 2) Server returns ReadResponse with bytes_size = 150; now client buffer has 50 free bytes, server is free to send up to 50 bytes in responses.
+ // 3) Client processes 100 bytes from buffer, now buffer free space is 150 bytes. Client sends ReadRequest with bytes_size = 100;
+ // 4) Server is free to send up to 50 + 100 = 150 bytes. It may, for example,
+ // send one 70 B response and another 80 B response without receiving additional requests.
+ //
+ // So in expression 'A = (sum of bytes_size in all ReadRequests) - (sum of bytes_size in all ReadResponses)'
+ // server will keep A (available size for responses) non-negative.
+ // But there is an exception. If server receives ReadRequest, and the first message in response exceeds A -
+ // then it will still be delivered, and A will become negative until client sends enough additional ReadRequests.
+ int64 bytes_size = 1;
}
// Data read.
@@ -415,20 +350,109 @@ message StreamReadServerMessage {
int64 bytes_size = 2;
}
+ // Signal for server that client processed some read data.
+ message CommitOffsetRequest {
+ // Partition offsets that indicates processed data.
+ repeated PartitionCommitOffset commit_offsets = 1;
+
+ // Message that is used for describing commit.
+ message PartitionCommitOffset {
+ // Identifier of partition session with data to commit.
+ int64 partition_session_id = 1;
+ // Processed offsets ranges, repeated in case of disjoint ranges.
+ repeated OffsetsRange offsets = 2;
+ }
+ }
+
+ // Acknowledgement for commits.
+ message CommitOffsetResponse {
+ // Partitions with progress.
+ repeated PartitionCommittedOffset partitions_committed_offsets = 1;
+
+ // Per-partition commit representation.
+ message PartitionCommittedOffset {
+ // Partition session identifier.
+ int64 partition_session_id = 1;
+ // Upper bound for committed offsets.
+ int64 committed_offset = 2;
+ }
+ }
+
+ message PartitionSessionStatusRequest {
+ int64 partition_session_id = 1;
+ }
+
// Response for status request.
message PartitionSessionStatusResponse {
// Identifier of partition session whose status was requested.
int64 partition_session_id = 1;
- // First and next-after-last offsets of messages in partition.
+ // Partition contains messages with offsets in range [start, end).
OffsetsRange partition_offsets = 2;
// Upper bound for committed offsets.
+ // TODO example (here or in CommitRequest)
int64 committed_offset = 3;
// Write timestamp of next message written to this partition will be no less than write_time_high_watermark.
google.protobuf.Timestamp write_time_high_watermark = 4;
}
+
+ // Command from server to create and start a partition session.
+ // Client must react on this command by sending StartPartitionSessionResponse when ready to recieve data from this partition.
+ message StartPartitionSessionRequest {
+ // Partition session description.
+ PartitionSession partition_session = 1;
+
+ // Actual upper bound for all committed offsets.
+ // I.e. each offset up to and including (committed_offset - 1) was committed.
+ int64 committed_offset = 2;
+
+ // Partition contains messages with offsets in range [start, end).
+ OffsetsRange partition_offsets = 3;
+ }
+
+ // Signal for server that cient is ready to recive data for partition.
+ message StartPartitionSessionResponse {
+ // Partition session identifier of partition to start read.
+ int64 partition_session_id = 1;
+
+ // Reads in this partition session will start from offset no less than read_offset.
+ // If read_offset is set, server will check that read_offset is no less that actual committed offset.
+ // If check will fail then server will send a message describing error in status and issues fields, then close stream.
+ //
+ // If read_offset is not set, no check will be made.
+ // InitRequest.max_lag and InitRequest.read_from could lead to skip of more messages.
+ // Server will return data starting from offset that is maximum of actual committed offset, read_offset (if set)
+ // and offsets calculated from InitRequest.max_lag and InitRequest.read_from.
+ optional int64 read_offset = 2;
+ // All messages with offset less than commit_offset are processed by client. Server will commit this position if this is not done yet.
+ optional int64 commit_offset = 3;
+ }
+
+ // Command from server to stop and destroy concrete partition session.
+ message StopPartitionSessionRequest {
+ // Identifier of partition session that is ready to be closed by server.
+ int64 partition_session_id = 1;
+
+ // Flag of graceful stop.
+ // If True then server is waiting for response from client before giving of this partition for other read session.
+ // Server will not send more data from this partition.
+ // Client can process all received data and wait for commit and only after send response.
+ // If False then server gives partition for other session right now.
+ // All further commits for this PartitionSession has no effect. Server is not waiting for response.
+ bool graceful = 2;
+
+ // Upper bound for committed offsets.
+ int64 committed_offset = 3;
+ }
+
+ // Signal for server that client finished working with this partition. Must be sent only after corresponding StopPartitionSessionRequest from server.
+ // Server will give this partition to other read session only after StopPartitionSessionResponse signal.
+ message StopPartitionSessionResponse {
+ // Partition session identifier of partition session that is released by client.
+ int64 partition_session_id = 1;
+ }
}
@@ -452,70 +476,58 @@ message DropTopicResult {
// Message for describing topic internals.
message TopicSettings {
- // How many partitions in topic. Must less than database limit. Default limit - 10.
- int32 partitions_count = 1 [(value) = "> 0"];
- oneof retention {
- // How long data in partition should be stored. Must be greater than 0 and less than limit for this database.
- // Default limit - 36 hours.
- google.protobuf.Duration retention_period = 2;
- // How much data in partition should be stored. Must be greater than 0 and less than limit for this database.
- int64 retention_storage_bytes = 14 [(value) = ">= 0"];
+ oneof partitions {
+ // How many uniform partitions in topic. Must less than database limit. Default limit - 10.
+ int32 partitions_count = 1;
+ // 2 reserved for partition_at_keys
}
- // How long last written sequence number for message group should be stored. Must be greater then retention_period and less then limit for this database. Default limit - 16 days.
- google.protobuf.Duration message_group_seq_no_retention_period = 12;
- // How many last written sequence number for various message groups should be stored per partition. Must be less than limit for this database. Default limit - 6*10^6 values.
- int64 max_partition_message_groups_seq_no_stored = 13 [(value) = ">= 0"];
- reserved 3;
- reserved "supported_format";
+ // How long data in partition should be stored. Must be greater than 0 and less than limit for this database.
+ // Default limit - 36 hours.
+ google.protobuf.Duration retention_period = 2;
+ // How much data in partition should be stored. Must be greater than 0 and less than limit for this database.
+ int64 retention_storage_mb = 3;
+ reserved 4; // supported_format.
// List of allowed codecs for writers.
// See enum Codec above for values.
- // Writes with codec not from this list are forbiden.
- repeated int64 supported_codecs = 4 [(size).le = 100];
- // Max storage usage for each topic's partition. Must be less than database limit. Default limit - 130 GB.
- int64 max_partition_storage_size = 5 [(value) = ">= 0"];
+ // Writes with codec not from this list are forbidden.
+ repeated int64 supported_codecs = 5;
// Partition write speed in bytes per second. Must be less than database limit. Default limit - 1 MB/s.
- int64 max_partition_write_speed = 6 [(value) = ">= 0"];
+ int64 partition_write_speed_bytes_per_second = 6;
// Burst size for write in partition, in bytes. Must be less than database limit. Default limit - 1 MB.
- int64 max_partition_write_burst = 7 [(value) = ">= 0"];
+ int64 partition_write_burst_bytes = 7;
- // Disallows client writes. Used for mirrored topics in federation.
- bool client_write_disabled = 8;
- // Message for read rules description.
+ // User and server attributes of topic. Server attributes starts from "_" and will be validated by server.
+ map<string, string> attributes = 8;
+
+ // List of consumers for this topic.
+ repeated Consumer consumers = 9;
+
+ // Consumer description.
message Consumer {
// Must have valid not empty name as a key.
- string name = 1 [(required) = true];
+ string name = 1;
+ // Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
+ // User should take care that such consumer never stalls, to prevent running out of disk space.
// Flag that this consumer is important.
bool important = 2;
- // All messages with smaller timestamp of write will be skipped.
- google.protobuf.Timestamp start_from_written_at = 3;
- reserved 4;
- reserved "supported_format";
+ // All messages with smaller server written_at timestamp will be skipped.
+ google.protobuf.Timestamp read_from = 3;
+ reserved 4; // supported_format
// List of supported codecs by this consumer.
// See enum Codec above for values.
// supported_codecs on topic must be contained inside this list.
- repeated int64 supported_codecs = 5 [(size).le = 100];
-
- // Read rule version. Any non-negative integer.
- int64 version = 6 [(value) = ">= 0"];
+ repeated int64 supported_codecs = 5;
- // Client service type.
- string service_type = 7;
+ // Attributes of consumer
+ map<string, string> attributes = 6;
}
-
- // List of consumer read rules for this topic.
- repeated Consumer consumers = 9 [(size).le = 3000];
-
- // User and server attributes of topic. Server attributes starts from "_" and will be validated by server.
- map<string, string> attributes = 10;
}
// Create topic request sent from client to server.
message CreateTopicRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Topic path.
- // e.g. /database_path/dir/my-topic
- // or /database_path/dir/my-topic.federation-mirrors/sas
- string path = 2 [(required) = true];
+ string path = 2;
// Topic settings.
TopicSettings settings = 4;
}
@@ -535,7 +547,7 @@ message CreateTopicResult {
message AlterTopicRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Topic path.
- string path = 2 [(required) = true];
+ string path = 2;
// New topic settings to be set. All options inside should be set despite same value.
TopicSettings settings = 4;
}
@@ -554,7 +566,7 @@ message AlterTopicResult {
message AddConsumerRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Topic path.
- string topic = 2 [(required) = true];
+ string path = 2;
// consumers to add
TopicSettings.Consumer consumer = 3;
}
@@ -569,23 +581,23 @@ message AddConsumerResponse {
message AddConsumerResult {
}
-// Remove consumer request for existing topic.
-message RemoveConsumerRequest {
+// Drop consumer request for existing topic.
+message DropConsumerRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Topic path.
- string topic = 2 [(required) = true];
- // Name of consumer to remove.
+ string path = 2;
+ // Name of consumer to drop.
string consumer = 3;
}
-// Remove consumer response for existing topic.
-message RemoveConsumerResponse {
+// Drop consumer response for existing topic.
+message DropConsumerResponse {
// Result of request will be inside operation.
Ydb.Operations.Operation operation = 1;
}
-// Remove consumer result message that will be inside RemoveConsumerReponse.operation.
-message RemoveConsumerResult {
+// Drop consumer result message that will be inside DropConsumerReponse.operation.
+message DropConsumerResult {
}
// Describe topic request sent from client to server.
@@ -593,7 +605,7 @@ message DescribeTopicRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Topic path.
- string path = 2 [(required) = true];
+ string path = 2;
// If this message is present, response will include runtime topic statistics.
IncludeStats include_stats = 3;
@@ -612,7 +624,7 @@ message DescribeTopicResponse {
// Describe topic result message that will be inside DescribeTopicResponse.operation.
message DescribeTopicResult {
- // Topic path.
+ // Description of scheme object.
Ydb.Scheme.Entry self = 1;
// Settings of topic.
TopicSettings settings = 2;
@@ -628,24 +640,16 @@ message DescribeTopicResult {
// Patition identifier inside topic.
int64 partition_id = 1;
- // Request status of partition.
- Ydb.StatusIds.StatusCode status = 2;
- // Issues if any.
- repeated Ydb.Issue.IssueMessage issues = 3;
-
- // First and next-after-last offsets of messages in partition.
- OffsetsRange partition_offsets = 4;
+ // Partition contains messages with offsets in range [start, end).
+ OffsetsRange partition_offsets = 2;
// Host name of node where partition leader is running.
- string tablet_node = 5;
+ string tablet_node = 3;
// Statistics of particular consumer, if requested.
- ConsumerStats consumer_stats = 6;
+ ConsumerStats consumer_stats = 4;
message ConsumerStats {
- // Consumer name.
- string name = 1;
-
// Offset of consumer committed message a.k.a. first not processed message.
// If commit_offset == end_offset then all messages from partition are processed.
int64 commit_offset = 2;
@@ -663,8 +667,8 @@ message DescribeTopicResult {
// Session identifier that locked and reading this partition right now.
string session_id = 7;
- // Ip if node that created reading this session.
- string client_node = 8;
+ // Ip of node that created reading this session.
+ string client_ip = 8;
// Host name of proxy node that processing this reading session.
string proxy_node = 9;