aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ildar-khisam@yandex-team.ru>2022-06-10 21:17:33 +0300
committerildar-khisam <ildar-khisam@yandex-team.ru>2022-06-10 21:17:33 +0300
commit1618314272e381b58e0558c971970d24ef5c191a (patch)
tree9e6247aed590ccdf203a23a554fff7c023db2f0a
parent84d5626c87e97e75fb24b2fff9249ece342ee1c4 (diff)
downloadydb-1618314272e381b58e0558c971970d24ef5c191a.tar.gz
drafts for public grpc api
Draft for Persqueue Public API v1 ref:254a2b657292686c01c75f97d09fce4531da6edd
-rw-r--r--ydb/public/api/grpc/draft/ydb_topic_v1.proto89
-rw-r--r--ydb/public/api/protos/ydb_topic.proto678
2 files changed, 767 insertions, 0 deletions
diff --git a/ydb/public/api/grpc/draft/ydb_topic_v1.proto b/ydb/public/api/grpc/draft/ydb_topic_v1.proto
new file mode 100644
index 0000000000..d3bb31046a
--- /dev/null
+++ b/ydb/public/api/grpc/draft/ydb_topic_v1.proto
@@ -0,0 +1,89 @@
+syntax = "proto3";
+option cc_enable_arenas = true;
+
+package Ydb.Topic.V1;
+
+option java_package = "com.yandex.ydb.topic.v1";
+
+import "ydb/public/api/protos/ydb_topic.proto";
+
+service TopicService {
+ // Create Write Session
+ // Pipeline example:
+ // client server
+ // InitRequest(Topic, MessageGroupID, ...)
+ // ---------------->
+ // InitResponse(Partition, MaxSeqNo, ...)
+ // <----------------
+ // WriteRequest(data1, seqNo1)
+ // ---------------->
+ // WriteRequest(data2, seqNo2)
+ // ---------------->
+ // WriteResponse(seqNo1, offset1, ...)
+ // <----------------
+ // WriteRequest(data3, seqNo3)
+ // ---------------->
+ // WriteResponse(seqNo2, offset2, ...)
+ // <----------------
+ // [something went wrong] (status != SUCCESS, issues not empty)
+ // <----------------
+ rpc StreamWrite(stream StreamWriteClientMessage) returns (stream StreamWriteServerMessage);
+
+
+ // Create Read Session
+ // Pipeline:
+ // client server
+ // InitRequest(Topics, ClientId, ...)
+ // ---------------->
+ // InitResponse(SessionId)
+ // <----------------
+ // ReadRequest
+ // ---------------->
+ // ReadRequest
+ // ---------------->
+ // StartPartitionSessionRequest(Topic1, Partition1, PartitionSessionID1, ...)
+ // <----------------
+ // StartPartitionSessionRequest(Topic2, Partition2, PartitionSessionID2, ...)
+ // <----------------
+ // StartPartitionSessionResponse(PartitionSessionID1, ...) - client must respond with this message to actually start recieving data messages from this partition
+ // ---------------->
+ // StopPartitionSessionRequest(PartitionSessionID1, ...)
+ // <----------------
+ // StopPartitionSessionResponse(PartitionSessionID1, ...) - only after this response server will give this parittion to other session.
+ // ---------------->
+ // StartPartitionSessionResponse(PartitionSession2, ...)
+ // ---------------->
+ // ReadResponse(data, ...)
+ // <----------------
+ // CommitRequest(PartitionCommit1, ...)
+ // ---------------->
+ // CommitResponse(PartitionCommitAck1, ...)
+ // <----------------
+ // [something went wrong] (status != SUCCESS, issues not empty)
+ // <----------------
+ rpc StreamRead(stream StreamReadClientMessage) returns (stream StreamReadServerMessage);
+
+
+ // Describe topic command.
+ rpc DescribeTopic(DescribeTopicRequest) returns (DescribeTopicResponse);
+
+
+ // Drop topic command.
+ rpc DropTopic(DropTopicRequest) returns (DropTopicResponse);
+
+
+ // Create topic command.
+ rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse);
+
+
+ // Alter topic command.
+ rpc AlterTopic(AlterTopicRequest) returns (AlterTopicResponse);
+
+
+ // Add consumer command.
+ rpc AddConsumer(AddConsumerRequest) returns (AddConsumerResponse);
+
+
+ // Remove consumer command.
+ rpc RemoveConsumer(RemoveConsumerRequest) returns (RemoveConsumerResponse);
+}
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
new file mode 100644
index 0000000000..bb73c62931
--- /dev/null
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -0,0 +1,678 @@
+syntax = "proto3";
+import "ydb/public/api/protos/ydb_operation.proto";
+import "ydb/public/api/protos/ydb_scheme.proto";
+import "ydb/public/api/protos/ydb_status_codes.proto";
+import "ydb/public/api/protos/ydb_issue_message.proto";
+import "ydb/public/api/protos/annotations/validation.proto";
+
+import "google/protobuf/timestamp.proto";
+
+package Ydb.Topic;
+
+option java_package = "com.yandex.ydb.topic";
+
+option cc_enable_arenas = true;
+
+enum Codec {
+ CODEC_UNSPECIFIED = 0;
+ CODEC_RAW = 1;
+ CODEC_GZIP = 2;
+ CODEC_LZOP = 3;
+ CODEC_ZSTD = 4;
+}
+
+// Represents range [start, end).
+// I.e. (end - 1) is the greatest of offsets, included in non-empty range.
+message OffsetsRange {
+ int64 start = 1;
+ int64 end = 2;
+}
+
+// In-session reauthentication and reauthorization, lets user increase session lifetime.
+// Client should wait for UpdateTokenResponse before sending next UpdateTokenRequest.
+message UpdateTokenRequest {
+ string token = 1;
+}
+
+message UpdateTokenResponse {
+}
+
+// Request for write session. Contains one of:
+// InitRequest - handshake request.
+// WriteRequest - portion of data to be written.
+// UpdateTokenRequest - user credentials if update is needed.
+message StreamWriteClientMessage {
+ oneof client_message {
+ InitRequest init_request = 1;
+ WriteRequest write_request = 2;
+ UpdateTokenRequest update_token_request = 3;
+ }
+
+ // Handshake request that must be sent to server first.
+ message InitRequest {
+ // Full path of topic to write to.
+ // e.g. /database_path/dir/my-topic
+ // or /database_path/dir/my-topic.federation-mirrors/sas
+ string topic = 1;
+ // Message group identifier of client data stream.
+ string message_group_id = 2;
+ // User metadata attached to this write session.
+ // Reader will get this session meta data with each message read.
+ map<string, string> write_session_meta = 3;
+ // If partition mapping is not provided, partition is chosen by server.
+ oneof partition_mapping {
+ // Explicit partition id to write to.
+ // If not set, then it is chosen by server instead.
+ int64 partition_id = 4;
+ }
+ }
+
+ // Represents portion of client messages.
+ message WriteRequest {
+ repeated MessageData messages = 1;
+
+ // Codec that is used for data compression.
+ // See enum Codec above for values.
+ int64 codec = 2;
+
+ message MessageData {
+ // Message sequence number, provided by client for deduplication.
+ // Starts at 1
+ int64 seq_no = 1;
+ // Creation timestamp
+ google.protobuf.Timestamp created_at = 2;
+ // Compressed client message body.
+ bytes data = 3;
+ // Uncompressed size of client message body.
+ int64 uncompressed_size = 4;
+ }
+ }
+}
+
+// Response for write session. Contains either non-success status, or one of:
+// InitResponse - correct handshake response.
+// WriteResponse - acknowledgment of storing client messages.
+// UpdateTokenResponse - acknowledgment of reauthentication and reauthorization.
+message StreamWriteServerMessage {
+ // Server status of response.
+ Ydb.StatusIds.StatusCode status = 1;
+
+ // Issues if any.
+ repeated Ydb.Issue.IssueMessage issues = 2;
+
+ oneof server_message {
+ InitResponse init_response = 3;
+ WriteResponse write_response = 4;
+ UpdateTokenResponse update_token_response = 5;
+ }
+
+ // Response for handshake.
+ message InitResponse {
+ // Last persisted message's sequence number for this message group.
+ // Zero for new message group.
+ int64 last_seq_no = 1;
+ // Unique identifier of write session. Used for debug purposes.
+ string session_id = 2;
+ // Identifier of partition that is matched for this write session.
+ int64 partition_id = 3;
+
+ // Client can only use compression codecs from this set to write messages to topic, session will be closed with BAD_REQUEST otherwise.
+ // See enum Codec above for values.
+ repeated int64 supported_codecs = 10;
+ }
+
+ // Message that represents acknowledgment for sequence of client messages.
+ // This sequence is persisted together so write statistics is for messages batch.
+ message WriteResponse {
+ // Number of acks is equal to number of messages in the corresponding WriteRequests.
+ repeated WriteAck acks = 1;
+
+ // Assigned partition for all client messages inside this batch.
+ // This actual partition may differ from that returned in InitResponse or other WriteResponses in this write session.
+ int64 partition_id = 2;
+
+ // Write statistics for this sequence of client messages.
+ WriteStatistics write_statistics = 3;
+
+ // Acknowledgment for one persistently written message.
+ message WriteAck {
+ // Sequence number as in WriteRequest.
+ int64 seq_no = 1;
+
+ // Either message is written for the first time or duplicate.
+ oneof message_write_status {
+ Written written = 2;
+ Skipped skipped = 3;
+ }
+
+ message Written {
+ // Assigned partition offset.
+ int64 offset = 1;
+ }
+
+ message Skipped {
+ enum Reason {
+ REASON_UNSPECIFIED = 0;
+ REASON_ALREADY_WRITTEN = 1;
+ }
+
+ // See enum Reason above for values.
+ int64 reason = 1;
+ }
+ }
+
+ // Message with write statistics.
+ message WriteStatistics {
+ // Time spent in persisting of data. Same for each message in response.
+ google.protobuf.Duration persist = 1;
+ // Time spent in queue before persisting, minimal of all messages in response.
+ google.protobuf.Duration queued_in_partition_min = 2;
+ // Time spent in queue before persisting, maximal of all messages in response.
+ google.protobuf.Duration queued_in_partition_max = 3;
+ // Time spent awaiting for partition write quota. Same for each message in response.
+ google.protobuf.Duration throttled_on_partition = 4;
+ // Time spent awaiting for topic write quota. Same for each message in response.
+ google.protobuf.Duration throttled_on_topic = 5;
+ }
+ }
+}
+
+
+// Request for read session. Contains one of:
+// InitRequest - handshake request.
+// ReadRequest - request for data.
+// CommitOffsetRequest - request for commit of some read data.
+// StartPartitionSessionResponse - signal for server that client is ready to get data from partition.
+// StopPartitionSessionResponse - signal for server that client finished working with partition. Must be sent only after corresponding request from server.
+// PartitionSessionStatusRequest - request for session status
+// UpdateTokenRequest - request to update auth token
+message StreamReadClientMessage {
+ oneof client_message {
+ InitRequest init_request = 1;
+ ReadRequest read_request = 2;
+ StartPartitionSessionResponse start_partition_session_response = 3;
+ StopPartitionSessionResponse stop_partition_session_response = 4;
+ CommitOffsetRequest commit_offset_request = 5;
+ PartitionSessionStatusRequest partition_session_status_request = 6;
+ UpdateTokenRequest update_token_request = 7;
+ }
+
+ // Handshake request.
+ message InitRequest {
+ // Message that describes topic to read.
+ // Topics that will be read by this session.
+ repeated TopicReadSettings topics_read_settings = 1;
+ // Path of consumer that is used for reading by this session.
+ string consumer = 2;
+
+ message TopicReadSettings {
+ // Topic path.
+ string path = 1;
+ // Partitions that will be read by this session.
+ // If list is empty - then session will read all partitions.
+ repeated int64 partition_ids = 2;
+ // Skip all messages that has write timestamp smaller than now - max_lag.
+ // Zero means infinite lag.
+ google.protobuf.Duration max_lag = 3;
+ // Read data only after this timestamp from this topic.
+ google.protobuf.Timestamp start_from_written_at = 4;
+ }
+ }
+
+ // Message that represents client readiness for receiving more data.
+ message ReadRequest {
+ // Server and client each keep total bytes size of all ReadResponses.
+ // When client is ready to receive 'bytes_size' more bytes in responses, it signals it via this ReadRequest.
+ // client is ready to receive ReadResponses with total size up to bytes_size.
+ // Except the case when the first message in response exceeds it - then it will be delivered.
+ // The case when the first message in response exceeds available bytes_size sum, is an exception. Such message will be still delivered.
+ // Available sum become negative in this case
+ int64 bytes_size = 1;
+ }
+
+ // Signal for server that cient is ready to recive data for partition.
+ message StartPartitionSessionResponse {
+ // Partition session identifier of partition to start read.
+ int64 partition_session_id = 1;
+
+ // Reads in this partition session will start from offset no less than read_offset.
+ // If read_offset is set, server will check that read_offset is no less that actual committed offset.
+ // If check will fail then server will send a message describing error in status and issues fields, then close stream.
+ //
+ // If read_offset is not set, no check will be made.
+ // InitRequest.max_lag and InitRequest.start_from_written_at could lead to skip of more messages.
+ // Server will return data starting from offset that is maximum of actual committed offset, read_offset (if set)
+ // and offsets calculated from InitRequest.max_lag and InitRequest.start_from_written_at.
+ optional int64 read_offset = 2;
+ // All messages with offset less than commit_offset are processed by client. Server will commit this position if this is not done yet.
+ optional int64 commit_offset = 3;
+ }
+
+ // Signal for server that client finished working with this partition. Must be sent only after corresponding Release request from server.
+ // Server will give this partition to other read session only after Released signal.
+ message StopPartitionSessionResponse {
+ // Partition session identifier of partition session that is released by client.
+ int64 partition_session_id = 1;
+ }
+
+ // Signal for server that client processed some read data.
+ message CommitOffsetRequest {
+ // Partition offsets that indicates processed data.
+ repeated PartitionCommitOffset commit_offsets = 1;
+
+ // Message that is used for describing commit.
+ message PartitionCommitOffset {
+ // Identifier of partition session with data to commit.
+ int64 partition_session_id = 1;
+ // Processed offsets ranges, repeated in case of disjoint ranges.
+ repeated OffsetsRange offsets = 2;
+ }
+ }
+
+ message PartitionSessionStatusRequest {
+ int64 partition_session_id = 1;
+ }
+}
+
+// Within a StreamRead session delivered messages are separated by partition.
+// Reads from a single partition are represented by a partition session.
+message PartitionSession {
+ // Identitifier of partition session. Unique inside one RPC call.
+ int64 partition_session_id = 1;
+ // Topic path of partition.
+ string topic = 2;
+ // Partition identifier.
+ int64 partition_id = 3;
+}
+
+// Response for read session. Contains one of:
+// InitResponse - handshake response from server.
+// ReadResponse - portion of data.
+// StartPartitionSessionRequest - command from server to create a partition session.
+// StopPartitionSessionRequest - command from server to destroy a partition session.
+// CommitOffsetResponse - acknowledgment for commit.
+// PartitionSessionStatusResponse - server response with partition session status.
+// UpdateTokenResponse - acknowledgment of token update.
+message StreamReadServerMessage {
+ // Server status of response.
+ Ydb.StatusIds.StatusCode status = 1;
+
+ // Issues if any.
+ repeated Ydb.Issue.IssueMessage issues = 2;
+
+ oneof server_message {
+ InitResponse init_response = 3;
+ ReadResponse read_response = 4;
+ StartPartitionSessionRequest start_partition_session_request = 5;
+ StopPartitionSessionRequest stop_partition_session_request = 6;
+ CommitOffsetResponse commit_offset_response = 7;
+ PartitionSessionStatusResponse partition_session_status_response = 8;
+ UpdateTokenResponse update_token_response = 9;
+ }
+
+ // Handshake response.
+ message InitResponse {
+ // Read session identifier for debug purposes.
+ string session_id = 1;
+ }
+
+ // Command to create and start a partition session.
+ // Client must react on this signal by sending StartPartitionSessionResponse when ready to recieve data from this partition.
+ message StartPartitionSessionRequest {
+ // Partition session description.
+ PartitionSession partition_session = 1;
+
+ // Actual upper bound for all committed offsets.
+ // I.e. each offset up to and including (committed_offset - 1) was committed.
+ int64 committed_offset = 2;
+
+ // First and next-after-last offsets of messages in partition.
+ OffsetsRange partition_offsets = 3;
+ }
+
+ // Command to stop and destroy concrete partition session.
+ message StopPartitionSessionRequest {
+ // Identifier of partition session that is ready to be closed by server.
+ int64 partition_session_id = 1;
+
+ // Flag of graceful stop.
+ // If True then server is waiting for response from client before giving of this partition for other read session.
+ // Server will not send more data from this partition.
+ // Client can process all received data and wait for commit and only after send response.
+ // If False then server gives partition for other session right now.
+ // All further commits for this PartitionSession has no effect. Server is not waiting for response.
+ bool graceful = 2;
+
+ // Upper bound for committed offsets.
+ int64 committed_offset = 3;
+ }
+
+ // Acknowledgement for commits.
+ message CommitOffsetResponse {
+ // Partitions with progress.
+ repeated PartitionCommittedOffset partitions_committed_offsets = 1;
+
+ // Per-partition commit representation.
+ message PartitionCommittedOffset {
+ // Partition session identifier.
+ int64 partition_session_id = 1;
+ // Upper bound for committed offsets.
+ int64 committed_offset = 2;
+ }
+ }
+
+ // Data read.
+ message ReadResponse {
+ // One client message representation.
+ message MessageData {
+ // Partition offset in partition that assigned for message.
+ int64 offset = 1; //unique value for clientside deduplication - Topic:Partition:Offset
+ // Sequence number that provided with message on write from client.
+ int64 seq_no = 2;
+ // Timestamp of creation of message provided on write from client.
+ google.protobuf.Timestamp created_at = 3;
+ // Compressed client message body.
+ bytes data = 5;
+ // Uncompressed size of client message body.
+ int64 uncompressed_size = 6;
+
+ // Filled if partition_key / hash was used on message write.
+ string partition_key = 7;
+ bytes explicit_hash = 8;
+ }
+
+ // Representation of sequence of client messages from one write session.
+ message Batch {
+ // List of client messages.
+ repeated MessageData message_data = 1;
+
+ // Source identifier provided by client for this batch of client messages.
+ string message_group_id = 2;
+ // Client metadata attached to write session, the same for all messages in batch.
+ map<string, string> write_session_meta = 3;
+
+ // Codec that is used for data compression.
+ // See enum Codec above for values.
+ int64 codec = 4;
+
+ // Persist timestamp on server for batch.
+ google.protobuf.Timestamp written_at = 5;
+ }
+
+ // Representation of sequence of messages from one partition.
+ message PartitionData {
+ int64 partition_session_id = 1;
+
+ // Client messages, divided by write sessions.
+ repeated Batch batches = 2;
+ }
+
+ // Client messages, divided by partitions.
+ repeated PartitionData partition_data = 1;
+
+ // Total size in bytes of this response as calculated by server.
+ // See ReadRequest comment above.
+ int64 bytes_size = 2;
+ }
+
+ // Response for status request.
+ message PartitionSessionStatusResponse {
+ // Identifier of partition session whose status was requested.
+ int64 partition_session_id = 1;
+
+ // First and next-after-last offsets of messages in partition.
+ OffsetsRange partition_offsets = 2;
+
+ // Upper bound for committed offsets.
+ int64 committed_offset = 3;
+
+ // Write timestamp of next message written to this partition will be no less than write_time_high_watermark.
+ google.protobuf.Timestamp write_time_high_watermark = 4;
+ }
+}
+
+
+// Drop topic request sent from client to server.
+message DropTopicRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ // Topic path.
+ string path = 2;
+}
+
+
+// Drop topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR".
+message DropTopicResponse {
+ // Result of request will be inside operation.
+ Ydb.Operations.Operation operation = 1;
+}
+
+// Drop topic result message that will be inside DropTopicResponse.operation.
+message DropTopicResult {
+}
+
+// Message for describing topic internals.
+message TopicSettings {
+ // How many partitions in topic. Must less than database limit. Default limit - 10.
+ int32 partitions_count = 1 [(value) = "> 0"];
+ oneof retention {
+ // How long data in partition should be stored. Must be greater than 0 and less than limit for this database.
+ // Default limit - 36 hours.
+ google.protobuf.Duration retention_period = 2;
+ // How much data in partition should be stored. Must be greater than 0 and less than limit for this database.
+ int64 retention_storage_bytes = 14 [(value) = ">= 0"];
+ }
+ // How long last written sequence number for message group should be stored. Must be greater then retention_period and less then limit for this database. Default limit - 16 days.
+ google.protobuf.Duration message_group_seq_no_retention_period = 12;
+ // How many last written sequence number for various message groups should be stored per partition. Must be less than limit for this database. Default limit - 6*10^6 values.
+ int64 max_partition_message_groups_seq_no_stored = 13 [(value) = ">= 0"];
+ reserved 3;
+ reserved "supported_format";
+ // List of allowed codecs for writers.
+ // See enum Codec above for values.
+ // Writes with codec not from this list are forbiden.
+ repeated int64 supported_codecs = 4 [(size).le = 100];
+ // Max storage usage for each topic's partition. Must be less than database limit. Default limit - 130 GB.
+ int64 max_partition_storage_size = 5 [(value) = ">= 0"];
+ // Partition write speed in bytes per second. Must be less than database limit. Default limit - 1 MB/s.
+ int64 max_partition_write_speed = 6 [(value) = ">= 0"];
+ // Burst size for write in partition, in bytes. Must be less than database limit. Default limit - 1 MB.
+ int64 max_partition_write_burst = 7 [(value) = ">= 0"];
+
+ // Disallows client writes. Used for mirrored topics in federation.
+ bool client_write_disabled = 8;
+ // Message for read rules description.
+ message Consumer {
+ // Must have valid not empty name as a key.
+ string name = 1 [(required) = true];
+ // Flag that this consumer is important.
+ bool important = 2;
+ // All messages with smaller timestamp of write will be skipped.
+ google.protobuf.Timestamp start_from_written_at = 3;
+ reserved 4;
+ reserved "supported_format";
+ // List of supported codecs by this consumer.
+ // See enum Codec above for values.
+ // supported_codecs on topic must be contained inside this list.
+ repeated int64 supported_codecs = 5 [(size).le = 100];
+
+ // Read rule version. Any non-negative integer.
+ int64 version = 6 [(value) = ">= 0"];
+
+ // Client service type.
+ string service_type = 7;
+ }
+
+ // List of consumer read rules for this topic.
+ repeated Consumer consumers = 9 [(size).le = 3000];
+
+ // User and server attributes of topic. Server attributes starts from "_" and will be validated by server.
+ map<string, string> attributes = 10;
+}
+
+// Create topic request sent from client to server.
+message CreateTopicRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ // Topic path.
+ // e.g. /database_path/dir/my-topic
+ // or /database_path/dir/my-topic.federation-mirrors/sas
+ string path = 2 [(required) = true];
+ // Topic settings.
+ TopicSettings settings = 4;
+}
+
+
+// Create topic response sent from server to client. If topic is already exists then response status will be "ALREADY_EXISTS".
+message CreateTopicResponse {
+ // Result of request will be inside operation.
+ Ydb.Operations.Operation operation = 1;
+}
+
+// Create topic result message that will be inside CreateTopicResponse.operation.
+message CreateTopicResult {
+}
+
+// Update existing topic request sent from client to server.
+message AlterTopicRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ // Topic path.
+ string path = 2 [(required) = true];
+ // New topic settings to be set. All options inside should be set despite same value.
+ TopicSettings settings = 4;
+}
+
+// Update topic response sent from server to client.
+message AlterTopicResponse {
+ // Result of request will be inside operation.
+ Ydb.Operations.Operation operation = 1;
+}
+
+// Update topic result message that will be inside UpdateTopicResponse.operation.
+message AlterTopicResult {
+}
+
+// Add consumer for existing topic request.
+message AddConsumerRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ // Topic path.
+ string topic = 2 [(required) = true];
+ // consumers to add
+ TopicSettings.Consumer consumer = 3;
+}
+
+// Add consumer for existing topic response.
+message AddConsumerResponse {
+ // Result of request will be inside operation.
+ Ydb.Operations.Operation operation = 1;
+}
+
+// Add consumer result message that will be inside AddConsumerReponse.operation.
+message AddConsumerResult {
+}
+
+// Remove consumer request for existing topic.
+message RemoveConsumerRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ // Topic path.
+ string topic = 2 [(required) = true];
+ // Name of consumer to remove.
+ string consumer = 3;
+}
+
+// Remove consumer response for existing topic.
+message RemoveConsumerResponse {
+ // Result of request will be inside operation.
+ Ydb.Operations.Operation operation = 1;
+}
+
+// Remove consumer result message that will be inside RemoveConsumerReponse.operation.
+message RemoveConsumerResult {
+}
+
+// Describe topic request sent from client to server.
+message DescribeTopicRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+
+ // Topic path.
+ string path = 2 [(required) = true];
+
+ // If this message is present, response will include runtime topic statistics.
+ IncludeStats include_stats = 3;
+
+ message IncludeStats {
+ // Consumer statistics for reading this topic may be included in response.
+ string consumer = 1;
+ }
+}
+
+// Describe topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR".
+message DescribeTopicResponse {
+ // Result of request will be inside operation.
+ Ydb.Operations.Operation operation = 1;
+}
+
+// Describe topic result message that will be inside DescribeTopicResponse.operation.
+message DescribeTopicResult {
+ // Topic path.
+ Ydb.Scheme.Entry self = 1;
+ // Settings of topic.
+ TopicSettings settings = 2;
+
+ // Message containing information about concrete topic reading, if requested.
+ TopicStats topic_stats = 3;
+
+ message TopicStats {
+ repeated PartitionStats partition_stats = 1;
+
+ // Message containing information about concrete topic's partition reading.
+ message PartitionStats {
+ // Patition identifier inside topic.
+ int64 partition_id = 1;
+
+ // Request status of partition.
+ Ydb.StatusIds.StatusCode status = 2;
+ // Issues if any.
+ repeated Ydb.Issue.IssueMessage issues = 3;
+
+ // First and next-after-last offsets of messages in partition.
+ OffsetsRange partition_offsets = 4;
+
+ // Host name of node where partition leader is running.
+ string tablet_node = 5;
+
+ // Statistics of particular consumer, if requested.
+ ConsumerStats consumer_stats = 6;
+
+ message ConsumerStats {
+ // Consumer name.
+ string name = 1;
+
+ // Offset of consumer committed message a.k.a. first not processed message.
+ // If commit_offset == end_offset then all messages from partition are processed.
+ int64 commit_offset = 2;
+ // Consumer lag in time between committed and last messages in partition.
+ int64 commit_time_lag_ms = 3;
+
+ // Offset ranges client wants to commit, but server is waiting for commits of gaps.
+ repeated OffsetsRange out_of_order_commit_offset_ranges = 4;
+
+ // Offset of first not read message by consumer from this partition.
+ // read_offset can be bigger that committed_offset - consumer could read some messages but not yet commit them.
+ int64 read_offset = 5;
+ // Consumer lag in time between read and last messages in partition.
+ int64 read_time_lag_ms = 6;
+
+ // Session identifier that locked and reading this partition right now.
+ string session_id = 7;
+ // Ip if node that created reading this session.
+ string client_node = 8;
+ // Host name of proxy node that processing this reading session.
+ string proxy_node = 9;
+
+ // Assign identifier of actual partition assignment.
+ int64 partition_session_id = 10;
+ // Timestamp of partition session start.
+ google.protobuf.Timestamp partition_session_started_at = 11;
+ }
+ }
+ }
+}