diff options
author | ildar-khisam <ildar-khisam@yandex-team.ru> | 2022-06-10 21:17:33 +0300 |
---|---|---|
committer | ildar-khisam <ildar-khisam@yandex-team.ru> | 2022-06-10 21:17:33 +0300 |
commit | 1618314272e381b58e0558c971970d24ef5c191a (patch) | |
tree | 9e6247aed590ccdf203a23a554fff7c023db2f0a | |
parent | 84d5626c87e97e75fb24b2fff9249ece342ee1c4 (diff) | |
download | ydb-1618314272e381b58e0558c971970d24ef5c191a.tar.gz |
drafts for public grpc api
Draft for Persqueue Public API v1
ref:254a2b657292686c01c75f97d09fce4531da6edd
-rw-r--r-- | ydb/public/api/grpc/draft/ydb_topic_v1.proto | 89 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_topic.proto | 678 |
2 files changed, 767 insertions, 0 deletions
diff --git a/ydb/public/api/grpc/draft/ydb_topic_v1.proto b/ydb/public/api/grpc/draft/ydb_topic_v1.proto new file mode 100644 index 0000000000..d3bb31046a --- /dev/null +++ b/ydb/public/api/grpc/draft/ydb_topic_v1.proto @@ -0,0 +1,89 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package Ydb.Topic.V1; + +option java_package = "com.yandex.ydb.topic.v1"; + +import "ydb/public/api/protos/ydb_topic.proto"; + +service TopicService { + // Create Write Session + // Pipeline example: + // client server + // InitRequest(Topic, MessageGroupID, ...) + // ----------------> + // InitResponse(Partition, MaxSeqNo, ...) + // <---------------- + // WriteRequest(data1, seqNo1) + // ----------------> + // WriteRequest(data2, seqNo2) + // ----------------> + // WriteResponse(seqNo1, offset1, ...) + // <---------------- + // WriteRequest(data3, seqNo3) + // ----------------> + // WriteResponse(seqNo2, offset2, ...) + // <---------------- + // [something went wrong] (status != SUCCESS, issues not empty) + // <---------------- + rpc StreamWrite(stream StreamWriteClientMessage) returns (stream StreamWriteServerMessage); + + + // Create Read Session + // Pipeline: + // client server + // InitRequest(Topics, ClientId, ...) + // ----------------> + // InitResponse(SessionId) + // <---------------- + // ReadRequest + // ----------------> + // ReadRequest + // ----------------> + // StartPartitionSessionRequest(Topic1, Partition1, PartitionSessionID1, ...) + // <---------------- + // StartPartitionSessionRequest(Topic2, Partition2, PartitionSessionID2, ...) + // <---------------- + // StartPartitionSessionResponse(PartitionSessionID1, ...) - client must respond with this message to actually start recieving data messages from this partition + // ----------------> + // StopPartitionSessionRequest(PartitionSessionID1, ...) + // <---------------- + // StopPartitionSessionResponse(PartitionSessionID1, ...) - only after this response server will give this parittion to other session. + // ----------------> + // StartPartitionSessionResponse(PartitionSession2, ...) + // ----------------> + // ReadResponse(data, ...) + // <---------------- + // CommitRequest(PartitionCommit1, ...) + // ----------------> + // CommitResponse(PartitionCommitAck1, ...) + // <---------------- + // [something went wrong] (status != SUCCESS, issues not empty) + // <---------------- + rpc StreamRead(stream StreamReadClientMessage) returns (stream StreamReadServerMessage); + + + // Describe topic command. + rpc DescribeTopic(DescribeTopicRequest) returns (DescribeTopicResponse); + + + // Drop topic command. + rpc DropTopic(DropTopicRequest) returns (DropTopicResponse); + + + // Create topic command. + rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse); + + + // Alter topic command. + rpc AlterTopic(AlterTopicRequest) returns (AlterTopicResponse); + + + // Add consumer command. + rpc AddConsumer(AddConsumerRequest) returns (AddConsumerResponse); + + + // Remove consumer command. + rpc RemoveConsumer(RemoveConsumerRequest) returns (RemoveConsumerResponse); +} diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto new file mode 100644 index 0000000000..bb73c62931 --- /dev/null +++ b/ydb/public/api/protos/ydb_topic.proto @@ -0,0 +1,678 @@ +syntax = "proto3"; +import "ydb/public/api/protos/ydb_operation.proto"; +import "ydb/public/api/protos/ydb_scheme.proto"; +import "ydb/public/api/protos/ydb_status_codes.proto"; +import "ydb/public/api/protos/ydb_issue_message.proto"; +import "ydb/public/api/protos/annotations/validation.proto"; + +import "google/protobuf/timestamp.proto"; + +package Ydb.Topic; + +option java_package = "com.yandex.ydb.topic"; + +option cc_enable_arenas = true; + +enum Codec { + CODEC_UNSPECIFIED = 0; + CODEC_RAW = 1; + CODEC_GZIP = 2; + CODEC_LZOP = 3; + CODEC_ZSTD = 4; +} + +// Represents range [start, end). +// I.e. (end - 1) is the greatest of offsets, included in non-empty range. +message OffsetsRange { + int64 start = 1; + int64 end = 2; +} + +// In-session reauthentication and reauthorization, lets user increase session lifetime. +// Client should wait for UpdateTokenResponse before sending next UpdateTokenRequest. +message UpdateTokenRequest { + string token = 1; +} + +message UpdateTokenResponse { +} + +// Request for write session. Contains one of: +// InitRequest - handshake request. +// WriteRequest - portion of data to be written. +// UpdateTokenRequest - user credentials if update is needed. +message StreamWriteClientMessage { + oneof client_message { + InitRequest init_request = 1; + WriteRequest write_request = 2; + UpdateTokenRequest update_token_request = 3; + } + + // Handshake request that must be sent to server first. + message InitRequest { + // Full path of topic to write to. + // e.g. /database_path/dir/my-topic + // or /database_path/dir/my-topic.federation-mirrors/sas + string topic = 1; + // Message group identifier of client data stream. + string message_group_id = 2; + // User metadata attached to this write session. + // Reader will get this session meta data with each message read. + map<string, string> write_session_meta = 3; + // If partition mapping is not provided, partition is chosen by server. + oneof partition_mapping { + // Explicit partition id to write to. + // If not set, then it is chosen by server instead. + int64 partition_id = 4; + } + } + + // Represents portion of client messages. + message WriteRequest { + repeated MessageData messages = 1; + + // Codec that is used for data compression. + // See enum Codec above for values. + int64 codec = 2; + + message MessageData { + // Message sequence number, provided by client for deduplication. + // Starts at 1 + int64 seq_no = 1; + // Creation timestamp + google.protobuf.Timestamp created_at = 2; + // Compressed client message body. + bytes data = 3; + // Uncompressed size of client message body. + int64 uncompressed_size = 4; + } + } +} + +// Response for write session. Contains either non-success status, or one of: +// InitResponse - correct handshake response. +// WriteResponse - acknowledgment of storing client messages. +// UpdateTokenResponse - acknowledgment of reauthentication and reauthorization. +message StreamWriteServerMessage { + // Server status of response. + Ydb.StatusIds.StatusCode status = 1; + + // Issues if any. + repeated Ydb.Issue.IssueMessage issues = 2; + + oneof server_message { + InitResponse init_response = 3; + WriteResponse write_response = 4; + UpdateTokenResponse update_token_response = 5; + } + + // Response for handshake. + message InitResponse { + // Last persisted message's sequence number for this message group. + // Zero for new message group. + int64 last_seq_no = 1; + // Unique identifier of write session. Used for debug purposes. + string session_id = 2; + // Identifier of partition that is matched for this write session. + int64 partition_id = 3; + + // Client can only use compression codecs from this set to write messages to topic, session will be closed with BAD_REQUEST otherwise. + // See enum Codec above for values. + repeated int64 supported_codecs = 10; + } + + // Message that represents acknowledgment for sequence of client messages. + // This sequence is persisted together so write statistics is for messages batch. + message WriteResponse { + // Number of acks is equal to number of messages in the corresponding WriteRequests. + repeated WriteAck acks = 1; + + // Assigned partition for all client messages inside this batch. + // This actual partition may differ from that returned in InitResponse or other WriteResponses in this write session. + int64 partition_id = 2; + + // Write statistics for this sequence of client messages. + WriteStatistics write_statistics = 3; + + // Acknowledgment for one persistently written message. + message WriteAck { + // Sequence number as in WriteRequest. + int64 seq_no = 1; + + // Either message is written for the first time or duplicate. + oneof message_write_status { + Written written = 2; + Skipped skipped = 3; + } + + message Written { + // Assigned partition offset. + int64 offset = 1; + } + + message Skipped { + enum Reason { + REASON_UNSPECIFIED = 0; + REASON_ALREADY_WRITTEN = 1; + } + + // See enum Reason above for values. + int64 reason = 1; + } + } + + // Message with write statistics. + message WriteStatistics { + // Time spent in persisting of data. Same for each message in response. + google.protobuf.Duration persist = 1; + // Time spent in queue before persisting, minimal of all messages in response. + google.protobuf.Duration queued_in_partition_min = 2; + // Time spent in queue before persisting, maximal of all messages in response. + google.protobuf.Duration queued_in_partition_max = 3; + // Time spent awaiting for partition write quota. Same for each message in response. + google.protobuf.Duration throttled_on_partition = 4; + // Time spent awaiting for topic write quota. Same for each message in response. + google.protobuf.Duration throttled_on_topic = 5; + } + } +} + + +// Request for read session. Contains one of: +// InitRequest - handshake request. +// ReadRequest - request for data. +// CommitOffsetRequest - request for commit of some read data. +// StartPartitionSessionResponse - signal for server that client is ready to get data from partition. +// StopPartitionSessionResponse - signal for server that client finished working with partition. Must be sent only after corresponding request from server. +// PartitionSessionStatusRequest - request for session status +// UpdateTokenRequest - request to update auth token +message StreamReadClientMessage { + oneof client_message { + InitRequest init_request = 1; + ReadRequest read_request = 2; + StartPartitionSessionResponse start_partition_session_response = 3; + StopPartitionSessionResponse stop_partition_session_response = 4; + CommitOffsetRequest commit_offset_request = 5; + PartitionSessionStatusRequest partition_session_status_request = 6; + UpdateTokenRequest update_token_request = 7; + } + + // Handshake request. + message InitRequest { + // Message that describes topic to read. + // Topics that will be read by this session. + repeated TopicReadSettings topics_read_settings = 1; + // Path of consumer that is used for reading by this session. + string consumer = 2; + + message TopicReadSettings { + // Topic path. + string path = 1; + // Partitions that will be read by this session. + // If list is empty - then session will read all partitions. + repeated int64 partition_ids = 2; + // Skip all messages that has write timestamp smaller than now - max_lag. + // Zero means infinite lag. + google.protobuf.Duration max_lag = 3; + // Read data only after this timestamp from this topic. + google.protobuf.Timestamp start_from_written_at = 4; + } + } + + // Message that represents client readiness for receiving more data. + message ReadRequest { + // Server and client each keep total bytes size of all ReadResponses. + // When client is ready to receive 'bytes_size' more bytes in responses, it signals it via this ReadRequest. + // client is ready to receive ReadResponses with total size up to bytes_size. + // Except the case when the first message in response exceeds it - then it will be delivered. + // The case when the first message in response exceeds available bytes_size sum, is an exception. Such message will be still delivered. + // Available sum become negative in this case + int64 bytes_size = 1; + } + + // Signal for server that cient is ready to recive data for partition. + message StartPartitionSessionResponse { + // Partition session identifier of partition to start read. + int64 partition_session_id = 1; + + // Reads in this partition session will start from offset no less than read_offset. + // If read_offset is set, server will check that read_offset is no less that actual committed offset. + // If check will fail then server will send a message describing error in status and issues fields, then close stream. + // + // If read_offset is not set, no check will be made. + // InitRequest.max_lag and InitRequest.start_from_written_at could lead to skip of more messages. + // Server will return data starting from offset that is maximum of actual committed offset, read_offset (if set) + // and offsets calculated from InitRequest.max_lag and InitRequest.start_from_written_at. + optional int64 read_offset = 2; + // All messages with offset less than commit_offset are processed by client. Server will commit this position if this is not done yet. + optional int64 commit_offset = 3; + } + + // Signal for server that client finished working with this partition. Must be sent only after corresponding Release request from server. + // Server will give this partition to other read session only after Released signal. + message StopPartitionSessionResponse { + // Partition session identifier of partition session that is released by client. + int64 partition_session_id = 1; + } + + // Signal for server that client processed some read data. + message CommitOffsetRequest { + // Partition offsets that indicates processed data. + repeated PartitionCommitOffset commit_offsets = 1; + + // Message that is used for describing commit. + message PartitionCommitOffset { + // Identifier of partition session with data to commit. + int64 partition_session_id = 1; + // Processed offsets ranges, repeated in case of disjoint ranges. + repeated OffsetsRange offsets = 2; + } + } + + message PartitionSessionStatusRequest { + int64 partition_session_id = 1; + } +} + +// Within a StreamRead session delivered messages are separated by partition. +// Reads from a single partition are represented by a partition session. +message PartitionSession { + // Identitifier of partition session. Unique inside one RPC call. + int64 partition_session_id = 1; + // Topic path of partition. + string topic = 2; + // Partition identifier. + int64 partition_id = 3; +} + +// Response for read session. Contains one of: +// InitResponse - handshake response from server. +// ReadResponse - portion of data. +// StartPartitionSessionRequest - command from server to create a partition session. +// StopPartitionSessionRequest - command from server to destroy a partition session. +// CommitOffsetResponse - acknowledgment for commit. +// PartitionSessionStatusResponse - server response with partition session status. +// UpdateTokenResponse - acknowledgment of token update. +message StreamReadServerMessage { + // Server status of response. + Ydb.StatusIds.StatusCode status = 1; + + // Issues if any. + repeated Ydb.Issue.IssueMessage issues = 2; + + oneof server_message { + InitResponse init_response = 3; + ReadResponse read_response = 4; + StartPartitionSessionRequest start_partition_session_request = 5; + StopPartitionSessionRequest stop_partition_session_request = 6; + CommitOffsetResponse commit_offset_response = 7; + PartitionSessionStatusResponse partition_session_status_response = 8; + UpdateTokenResponse update_token_response = 9; + } + + // Handshake response. + message InitResponse { + // Read session identifier for debug purposes. + string session_id = 1; + } + + // Command to create and start a partition session. + // Client must react on this signal by sending StartPartitionSessionResponse when ready to recieve data from this partition. + message StartPartitionSessionRequest { + // Partition session description. + PartitionSession partition_session = 1; + + // Actual upper bound for all committed offsets. + // I.e. each offset up to and including (committed_offset - 1) was committed. + int64 committed_offset = 2; + + // First and next-after-last offsets of messages in partition. + OffsetsRange partition_offsets = 3; + } + + // Command to stop and destroy concrete partition session. + message StopPartitionSessionRequest { + // Identifier of partition session that is ready to be closed by server. + int64 partition_session_id = 1; + + // Flag of graceful stop. + // If True then server is waiting for response from client before giving of this partition for other read session. + // Server will not send more data from this partition. + // Client can process all received data and wait for commit and only after send response. + // If False then server gives partition for other session right now. + // All further commits for this PartitionSession has no effect. Server is not waiting for response. + bool graceful = 2; + + // Upper bound for committed offsets. + int64 committed_offset = 3; + } + + // Acknowledgement for commits. + message CommitOffsetResponse { + // Partitions with progress. + repeated PartitionCommittedOffset partitions_committed_offsets = 1; + + // Per-partition commit representation. + message PartitionCommittedOffset { + // Partition session identifier. + int64 partition_session_id = 1; + // Upper bound for committed offsets. + int64 committed_offset = 2; + } + } + + // Data read. + message ReadResponse { + // One client message representation. + message MessageData { + // Partition offset in partition that assigned for message. + int64 offset = 1; //unique value for clientside deduplication - Topic:Partition:Offset + // Sequence number that provided with message on write from client. + int64 seq_no = 2; + // Timestamp of creation of message provided on write from client. + google.protobuf.Timestamp created_at = 3; + // Compressed client message body. + bytes data = 5; + // Uncompressed size of client message body. + int64 uncompressed_size = 6; + + // Filled if partition_key / hash was used on message write. + string partition_key = 7; + bytes explicit_hash = 8; + } + + // Representation of sequence of client messages from one write session. + message Batch { + // List of client messages. + repeated MessageData message_data = 1; + + // Source identifier provided by client for this batch of client messages. + string message_group_id = 2; + // Client metadata attached to write session, the same for all messages in batch. + map<string, string> write_session_meta = 3; + + // Codec that is used for data compression. + // See enum Codec above for values. + int64 codec = 4; + + // Persist timestamp on server for batch. + google.protobuf.Timestamp written_at = 5; + } + + // Representation of sequence of messages from one partition. + message PartitionData { + int64 partition_session_id = 1; + + // Client messages, divided by write sessions. + repeated Batch batches = 2; + } + + // Client messages, divided by partitions. + repeated PartitionData partition_data = 1; + + // Total size in bytes of this response as calculated by server. + // See ReadRequest comment above. + int64 bytes_size = 2; + } + + // Response for status request. + message PartitionSessionStatusResponse { + // Identifier of partition session whose status was requested. + int64 partition_session_id = 1; + + // First and next-after-last offsets of messages in partition. + OffsetsRange partition_offsets = 2; + + // Upper bound for committed offsets. + int64 committed_offset = 3; + + // Write timestamp of next message written to this partition will be no less than write_time_high_watermark. + google.protobuf.Timestamp write_time_high_watermark = 4; + } +} + + +// Drop topic request sent from client to server. +message DropTopicRequest { + Ydb.Operations.OperationParams operation_params = 1; + // Topic path. + string path = 2; +} + + +// Drop topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR". +message DropTopicResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +// Drop topic result message that will be inside DropTopicResponse.operation. +message DropTopicResult { +} + +// Message for describing topic internals. +message TopicSettings { + // How many partitions in topic. Must less than database limit. Default limit - 10. + int32 partitions_count = 1 [(value) = "> 0"]; + oneof retention { + // How long data in partition should be stored. Must be greater than 0 and less than limit for this database. + // Default limit - 36 hours. + google.protobuf.Duration retention_period = 2; + // How much data in partition should be stored. Must be greater than 0 and less than limit for this database. + int64 retention_storage_bytes = 14 [(value) = ">= 0"]; + } + // How long last written sequence number for message group should be stored. Must be greater then retention_period and less then limit for this database. Default limit - 16 days. + google.protobuf.Duration message_group_seq_no_retention_period = 12; + // How many last written sequence number for various message groups should be stored per partition. Must be less than limit for this database. Default limit - 6*10^6 values. + int64 max_partition_message_groups_seq_no_stored = 13 [(value) = ">= 0"]; + reserved 3; + reserved "supported_format"; + // List of allowed codecs for writers. + // See enum Codec above for values. + // Writes with codec not from this list are forbiden. + repeated int64 supported_codecs = 4 [(size).le = 100]; + // Max storage usage for each topic's partition. Must be less than database limit. Default limit - 130 GB. + int64 max_partition_storage_size = 5 [(value) = ">= 0"]; + // Partition write speed in bytes per second. Must be less than database limit. Default limit - 1 MB/s. + int64 max_partition_write_speed = 6 [(value) = ">= 0"]; + // Burst size for write in partition, in bytes. Must be less than database limit. Default limit - 1 MB. + int64 max_partition_write_burst = 7 [(value) = ">= 0"]; + + // Disallows client writes. Used for mirrored topics in federation. + bool client_write_disabled = 8; + // Message for read rules description. + message Consumer { + // Must have valid not empty name as a key. + string name = 1 [(required) = true]; + // Flag that this consumer is important. + bool important = 2; + // All messages with smaller timestamp of write will be skipped. + google.protobuf.Timestamp start_from_written_at = 3; + reserved 4; + reserved "supported_format"; + // List of supported codecs by this consumer. + // See enum Codec above for values. + // supported_codecs on topic must be contained inside this list. + repeated int64 supported_codecs = 5 [(size).le = 100]; + + // Read rule version. Any non-negative integer. + int64 version = 6 [(value) = ">= 0"]; + + // Client service type. + string service_type = 7; + } + + // List of consumer read rules for this topic. + repeated Consumer consumers = 9 [(size).le = 3000]; + + // User and server attributes of topic. Server attributes starts from "_" and will be validated by server. + map<string, string> attributes = 10; +} + +// Create topic request sent from client to server. +message CreateTopicRequest { + Ydb.Operations.OperationParams operation_params = 1; + // Topic path. + // e.g. /database_path/dir/my-topic + // or /database_path/dir/my-topic.federation-mirrors/sas + string path = 2 [(required) = true]; + // Topic settings. + TopicSettings settings = 4; +} + + +// Create topic response sent from server to client. If topic is already exists then response status will be "ALREADY_EXISTS". +message CreateTopicResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +// Create topic result message that will be inside CreateTopicResponse.operation. +message CreateTopicResult { +} + +// Update existing topic request sent from client to server. +message AlterTopicRequest { + Ydb.Operations.OperationParams operation_params = 1; + // Topic path. + string path = 2 [(required) = true]; + // New topic settings to be set. All options inside should be set despite same value. + TopicSettings settings = 4; +} + +// Update topic response sent from server to client. +message AlterTopicResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +// Update topic result message that will be inside UpdateTopicResponse.operation. +message AlterTopicResult { +} + +// Add consumer for existing topic request. +message AddConsumerRequest { + Ydb.Operations.OperationParams operation_params = 1; + // Topic path. + string topic = 2 [(required) = true]; + // consumers to add + TopicSettings.Consumer consumer = 3; +} + +// Add consumer for existing topic response. +message AddConsumerResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +// Add consumer result message that will be inside AddConsumerReponse.operation. +message AddConsumerResult { +} + +// Remove consumer request for existing topic. +message RemoveConsumerRequest { + Ydb.Operations.OperationParams operation_params = 1; + // Topic path. + string topic = 2 [(required) = true]; + // Name of consumer to remove. + string consumer = 3; +} + +// Remove consumer response for existing topic. +message RemoveConsumerResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +// Remove consumer result message that will be inside RemoveConsumerReponse.operation. +message RemoveConsumerResult { +} + +// Describe topic request sent from client to server. +message DescribeTopicRequest { + Ydb.Operations.OperationParams operation_params = 1; + + // Topic path. + string path = 2 [(required) = true]; + + // If this message is present, response will include runtime topic statistics. + IncludeStats include_stats = 3; + + message IncludeStats { + // Consumer statistics for reading this topic may be included in response. + string consumer = 1; + } +} + +// Describe topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR". +message DescribeTopicResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +// Describe topic result message that will be inside DescribeTopicResponse.operation. +message DescribeTopicResult { + // Topic path. + Ydb.Scheme.Entry self = 1; + // Settings of topic. + TopicSettings settings = 2; + + // Message containing information about concrete topic reading, if requested. + TopicStats topic_stats = 3; + + message TopicStats { + repeated PartitionStats partition_stats = 1; + + // Message containing information about concrete topic's partition reading. + message PartitionStats { + // Patition identifier inside topic. + int64 partition_id = 1; + + // Request status of partition. + Ydb.StatusIds.StatusCode status = 2; + // Issues if any. + repeated Ydb.Issue.IssueMessage issues = 3; + + // First and next-after-last offsets of messages in partition. + OffsetsRange partition_offsets = 4; + + // Host name of node where partition leader is running. + string tablet_node = 5; + + // Statistics of particular consumer, if requested. + ConsumerStats consumer_stats = 6; + + message ConsumerStats { + // Consumer name. + string name = 1; + + // Offset of consumer committed message a.k.a. first not processed message. + // If commit_offset == end_offset then all messages from partition are processed. + int64 commit_offset = 2; + // Consumer lag in time between committed and last messages in partition. + int64 commit_time_lag_ms = 3; + + // Offset ranges client wants to commit, but server is waiting for commits of gaps. + repeated OffsetsRange out_of_order_commit_offset_ranges = 4; + + // Offset of first not read message by consumer from this partition. + // read_offset can be bigger that committed_offset - consumer could read some messages but not yet commit them. + int64 read_offset = 5; + // Consumer lag in time between read and last messages in partition. + int64 read_time_lag_ms = 6; + + // Session identifier that locked and reading this partition right now. + string session_id = 7; + // Ip if node that created reading this session. + string client_node = 8; + // Host name of proxy node that processing this reading session. + string proxy_node = 9; + + // Assign identifier of actual partition assignment. + int64 partition_session_id = 10; + // Timestamp of partition session start. + google.protobuf.Timestamp partition_session_started_at = 11; + } + } + } +} |