diff options
| author | ildar-khisam <[email protected]> | 2022-05-24 11:57:31 +0300 |
|---|---|---|
| committer | ildar-khisam <[email protected]> | 2022-05-24 11:57:31 +0300 |
| commit | cdb547ea095e178dfd356b78583c919b4243f7db (patch) | |
| tree | a6ad5c0b304c2a7849bb2d4c3f9197745e257765 | |
| parent | 0ece656f216a197fb7535ac68d399dee6b8ddd84 (diff) | |
rename pq grpc proto messages
rename StreamingRead proto messages
ref:c4be9b0b221e1569d042c92d7697d3f62f22039b
| -rw-r--r-- | ydb/public/api/protos/ydb_persqueue_v1.proto | 707 | ||||
| -rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp | 2 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/actors/helpers.cpp | 6 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/actors/helpers.h | 2 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/actors/partition_actor.cpp | 93 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 93 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 114 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/ut/pq_data_writer.h | 2 |
8 files changed, 336 insertions, 683 deletions
diff --git a/ydb/public/api/protos/ydb_persqueue_v1.proto b/ydb/public/api/protos/ydb_persqueue_v1.proto index f3bd203fd14..af0f7c1ed0d 100644 --- a/ydb/public/api/protos/ydb_persqueue_v1.proto +++ b/ydb/public/api/protos/ydb_persqueue_v1.proto @@ -24,7 +24,7 @@ enum Codec { CODEC_ZSTD = 4; } -message SessionMetaValue { +message MetaValue { map<string, string> value = 1; } @@ -70,7 +70,7 @@ message StreamingWriteClientMessage { // Zero means any group. int64 partition_group_id = 4; - int64 max_supported_block_format_version = 5; + int64 max_supported_format_version = 5; string session_id = 100; // 0 for first init message and incremental value for connect retries. Used for server logging. @@ -128,18 +128,18 @@ message StreamingWriteClientMessage { * UpdateTokenResponse - acknowledgment of reauthentication and reauthorization. */ message StreamingWriteServerMessage { - oneof server_message { - InitResponse init_response = 3; - BatchWriteResponse batch_write_response = 4; - UpdateTokenResponse update_token_response = 5; - } - // Server status of response. Ydb.StatusIds.StatusCode status = 1; // Issues if any. repeated Ydb.Issue.IssueMessage issues = 2; + oneof server_message { + InitResponse init_response = 3; + BatchWriteResponse batch_write_response = 4; + UpdateTokenResponse update_token_response = 5; + } + // Response for handshake. message InitResponse { // Last persisted message's sequence number for this message group. @@ -235,34 +235,55 @@ message CommitOffsetRange { uint64 end_offset = 3; } -// TODO: replace with it actual protocol client message +/** + * Message that represents concrete partition session. + */ + message PartitionSession { + // Topic path of partition. + string topic = 1; + // Cluster of topic instance. + string cluster = 2; + // Partition identifier. Explicit only for debug purposes. + int64 partition_id = 3; + // Partition group identifier. Explicit only for debug purposes. + int64 partition_group_id = 4; + + // Identitifier of partition stream. Unique inside one RPC call. + int64 partition_session_id = 6; + + // Opaque blob. Provide it with partition stream in state for session reconnects. + bytes connection_meta = 7; +} + /** * Request for read session. Contains one of: * InitRequest - handshake request. * ReadRequest - request for data. * CommitRequest - request for commit of some read data. - * CreatePartitionStreamResponse - signal for server that client is ready to get data from partition. - * DestroyPartitionStreamResponse - signal for server that client finished working with partition. Must be sent only after corresponding Release request from server. - * StopReadRequest - signal for server that client is not ready to get more data from this partition. + * StartPartitionSessionResponse - signal for server that client is ready to get data from partition. + * StopPartitionSessionResponse - signal for server that client finished working with partition. Must be sent only after corresponding request from server. + * PauseReadRequest - signal for server that client is not ready to get more data from this partition. * ResumeReadRequest - signal for server that client is ready to get more data from this partition. + * PartitionSessionStatusRequest - request for session status + * AddTopicRequest - request for add topic + * RemoveTopicRequest - request for topic removal + * UpdateTokenRequest - request to update auth token */ -message StreamingReadClientMessageNew { +message StreamingReadClientMessage { oneof client_message { InitRequest init_request = 1; ReadRequest read_request = 2; - CreatePartitionStreamResponse create_partition_stream_response = 3; + StartPartitionSessionResponse start_partition_session_response = 3; CommitRequest commit_request = 4; - DestroyPartitionStreamResponse destroy_partition_stream_response = 5; - StopReadRequest stop_read_request = 6; + StopPartitionSessionResponse stop_partition_session_response = 5; + PauseReadRequest pause_read_request = 6; ResumeReadRequest resume_read_request = 7; - PartitionStreamStatusRequest partition_stream_status_request = 8; + PartitionSessionStatusRequest partition_session_status_request = 8; AddTopicRequest add_topic_request = 9; RemoveTopicRequest remove_topic_request = 10; + UpdateTokenRequest update_token_request = 11; } - // User credentials if update is needed or empty string. - string token = 20; - // Handshake request. message InitRequest { // Message that describes topic to read. @@ -281,38 +302,38 @@ message StreamingReadClientMessageNew { // Maximum block format version supported by the client. Server will asses this parameter and return actual data blocks version in // StreamingReadServerMessage.InitResponse.block_format_version_by_topic (and StreamingReadServerMessage.AddTopicResponse.block_format_version) // or error if client will not be able to read data. - int64 max_supported_block_format_version = 6; + int64 max_supported_format_version = 6; // Maximal size of client cache for message_group_id, ip and meta, per partition. - // There is separate caches for each partition partition streams. - // There is separate caches for message group identifiers, ip and meta inside one partition partition stream. + // There are separate caches for each partition partition sessions. + // There are separate caches for message group identifiers, ip and meta inside one partition session. int64 max_meta_cache_size = 10; // State of client read session. Could be provided to server for retries. message State { - message PartitionStreamState { + message PartitionSessionState { enum Status { // Not used state. STATUS_UNSPECIFIED = 0; - // Client seen Create message but not yet responded to server with Created message. - CREATING = 1; - // Client seen Destroy message but not yet responded with Released message. - DESTROYING = 2; - // Client sent Created or ResumeReadRequest message to server for this partition stream. + // Client saw StartPartitionSessionRequest message but not yet responded. + STARTING = 1; + // Client saw StopPartitionSessionRequest message but not yet responded. + STOPPING = 2; + // Client sent StartPartitionSessionResponse or ResumeReadRequest message to server for this partition session. READING = 3; - // Client sent StopReadRequest for this partition stream. - STOPPED = 4; + // Client sent PauseReadRequest for this partition session. + PAUSED = 4; } - // Partition partition stream. - PartitionStream partition_stream = 1; + // Partition partition session. + PartitionSession partition_session = 1; // Current read offset if has one. Actual for states DESTROYING, READING and STOPPED. int64 read_offset = 2; // Ranges of committed by client offsets. - repeated OffsetsRange offset_ranges = 3; - // Status of partition stream. + repeated OffsetsRange offsets_ranges = 3; + // Status of partition session. Status status = 4; } - repeated PartitionStreamState partition_streams_states = 1; + repeated PartitionSessionState partition_sessions_states = 1; } // Session identifier for retries. Must be the same as session_id from Inited server response. If this is first connect, not retry - do not use this field. @@ -335,9 +356,9 @@ message StreamingReadClientMessageNew { } // Signal for server that cient is ready to recive data for partition. - message CreatePartitionStreamResponse { - // Partition stream identifier of partition to start read. - int64 partition_stream_id = 1; + message StartPartitionSessionResponse { + // Partition session identifier of partition to start read. + int64 partition_session_id = 1; // Start reading from partition offset that is not less than read_offset. // Init.max_time_lag_ms and Init.read_timestamp_ms could lead to skip of more messages. @@ -356,24 +377,24 @@ message StreamingReadClientMessageNew { // Signal for server that client finished working with this partition. Must be sent only after corresponding Release request from server. // Server will give this partition to other read session only after Released signal. - message DestroyPartitionStreamResponse { - // Partition stream identifier of partition partition stream that is released by client. - int64 partition_stream_id = 1; + message StopPartitionSessionResponse { + // Partition session identifier of partition session that is released by client. + int64 partition_session_id = 1; } // Signal for server that client is not ready to recieve more data from this partition. - message StopReadRequest { - repeated int64 partition_stream_ids = 1; + message PauseReadRequest { + repeated int64 partition_session_ids = 1; } // Signal for server that client is ready to receive more data from this partition. message ResumeReadRequest { - repeated int64 partition_stream_ids = 1; + repeated int64 partition_session_ids = 1; // Offset to start reading - may be smaller than known one in case of dropping of read-ahead in client lib. repeated int64 read_offsets = 2; - // Cookie for matching data from PartitionStream after resuming. Must be greater than zero. + // Cookie for matching data from PartitionSession after resuming. Must be greater than zero. repeated int64 resume_cookies = 3; } @@ -383,16 +404,16 @@ message StreamingReadClientMessageNew { repeated PartitionCommit commits = 1; } - message PartitionStreamStatusRequest { - int64 partition_stream_id = 1; + message PartitionSessionStatusRequest { + int64 partition_session_id = 1; } - // Add topic to current read session + // Add topic to current read session. message AddTopicRequest { TopicReadSettings topic_read_settings = 1; } - // Remove topic from current read session + // Remove topic from current read session. message RemoveTopicRequest { string topic = 1; } @@ -411,40 +432,48 @@ message StreamingReadClientMessageNew { * Message that is used for describing commit. */ message PartitionCommit { - // Identifier of partition stream with data to commit. - int64 partition_stream_id = 1; + // Identifier of partition session with data to commit. + int64 partition_session_id = 1; // Processed ranges. repeated OffsetsRange offsets = 2; } } -// TODO: replace with it actual protocol server message /** * Response for read session. Contains one of : * InitResponse - handshake response from server. - * BatchReadResponse - portion of data. + * ReadResponse - portion of data. + * StartPartitionSessionRequest - command from server to create a partition partition session. + * StopPartitionSessionRequest - command from server to destroy a partition partition session. * CommitResponse - acknowledgment for commit. - * CreatePartitionStreamRequest - command from server to create a partition partition stream. - * DestroyPartitionStreamRequest - command from server to destroy a partition partition stream. + * PartitionSessionStatusResponse - server response with partition session status. + * PauseReadResponse - acknowledgment for pausing read from this partition. + * ResumeReadResponse - acknowledgment for resuming read from this partition. + * AddTopicResponse - acknowledgment of topic adding. + * RemoveTopicResponse - acknowledgment of topic removal. + * UpdateTokenResponse - acknowledgment of token update. */ -message StreamingReadServerMessageNew { - oneof server_message { - InitResponse init_response = 3; - BatchReadResponse batch_read_response = 4; - CreatePartitionStreamRequest create_partition_stream_request = 5; - DestroyPartitionStreamRequest destroy_partition_stream_request = 6; - CommitResponse commit_response = 7; - PartitionStreamStatusResponse partition_stream_status_response = 8; - StopReadResponse stop_read_response = 9; - ResumeReadResponse resume_read_response = 10; - AddTopicResponse add_topic_response = 11; - RemoveTopicResponse remove_topic_response = 12; - } - +message StreamingReadServerMessage { + // Server status of response. Ydb.StatusIds.StatusCode status = 1; + // Issues if any. repeated Ydb.Issue.IssueMessage issues = 2; + oneof server_message { + InitResponse init_response = 3; + ReadResponse read_response = 4; + StartPartitionSessionRequest start_partition_session_request = 6; + StopPartitionSessionRequest stop_partition_session_request = 7; + CommitResponse commit_response = 8; + PartitionSessionStatusResponse partition_session_status_response = 9; + PauseReadResponse pause_read_response = 10; + ResumeReadResponse resume_read_response = 11; + AddTopicResponse add_topic_response = 12; + RemoveTopicResponse remove_topic_response = 13; + UpdateTokenResponse update_token_response = 14; + } + // Handshake response. message InitResponse { // Read session identifier for debug purposes. @@ -452,16 +481,16 @@ message StreamingReadServerMessageNew { // Block format version of data client will receive from topics. map<string, int64> block_format_version_by_topic = 2; - // Choosed maximan cache size by server. + // Chosen maximal cache size by server. // Client must use cache of this size. Could change on retries - reduce size of cache in this case. int64 max_meta_cache_size = 10; } - // Command to create a partition partition stream. + // Command to create and start a partition session. // Client must react on this signal by sending StartRead when ready recieve data from this partition. - message CreatePartitionStreamRequest { + message StartPartitionSessionRequest { // Partition partition stream description. - PartitionStream partition_stream = 1; + PartitionSession partition_session = 1; // Actual committed offset. int64 committed_offset = 2; @@ -470,17 +499,17 @@ message StreamingReadServerMessageNew { } - // Command to destroy concrete partition stream. - message DestroyPartitionStreamRequest { - // Identifier of partition partition stream that is ready to be closed by server. - int64 partition_stream_id = 1; + // Command to stop and destroy concrete partition session. + message StopPartitionSessionRequest { + // Identifier of partition partition session that is ready to be closed by server. + int64 partition_session_id = 1; - // Flag of gracefull or not destroy. - // If True then server is waiting for Destroyed signal from client before giving of this partition for other read session. + // Flag of graceful stop. + // If True then server is waiting for response from client before giving of this partition for other read session. // Server will not send more data from this partition. - // Client can process all received data and wait for commit and only after send Destroyed signal. + // Client can process all received data and wait for commit and only after send response. // If False then server gives partition for other session right now. - // All futher commits for this PartitionStream has no effect. Server is not waiting for Destroyed signal. + // All further commits for this PartitionSession has no effect. Server is not waiting for response. bool graceful = 2; // Last known committed offset. @@ -491,8 +520,8 @@ message StreamingReadServerMessageNew { message CommitResponse { // Per-partition commit representation. message PartitionCommittedOffset { - // Partition partition stream identifier. - int64 partition_stream_id = 1; + // Partition partition session identifier. + int64 partition_session_id = 1; // Last committed offset. int64 committed_offset = 2; } @@ -500,119 +529,58 @@ message StreamingReadServerMessageNew { repeated PartitionCommittedOffset partitions_committed_offsets = 1; } - // Readed data. - message BatchReadResponse { + // Data read. + message ReadResponse { // One client message representation. - // Client lib must send commit right now for all skipped data (from it's read offset till first offset in range). - message PartitionData { - // Data inside this message is from partition stream with this identifier. - int64 partition_stream_id = 1; - - // Offsets in partition that assigned for messages. - // Unique value for clientside deduplication - (topic, cluster, partition_id, offset). - repeated int64 offsets = 2; - // Sequence numbers that provided with messages on write from client. - // Same size as offsets. - // Unique value for clientside deduplication - (topic, cluster, message_group_id, sequence_number). - repeated int64 sequence_numbers = 3; - // Timestamps of creation of messages provided on write from client. - // Same size as offsets. - repeated int64 created_at_ms = 4; - // Timestamps of writing in partition for client messages. - // Same size as offsets. - repeated int64 written_at_ms = 5; - - // New messageGroupIds for updating cache. - // Size of vector is the same as number of negative values in message_group_id_indexes. - repeated string message_group_ids = 6; - // Indexes of messageGroupIds. - // same size as offsets. - // Negative values (-X) means - put first not used messageGroupId from message_group_ids on index X in cache and use it for this client message. - // Positive values (X) means -use element by index X from cache for this client message. Do not change state of cache. - // Assumptions: - // - Server will use positive values only for proposed before indexes. - // - Each value is from 1 to max_meta_cache_size by abs. - // - Do not make assumptions about choosing algorihm. - // - There is separate caches of size max_meta_cache_size for different partition and different metadata fileds - message_group_id, ip and session_meta_data. - // - Number of negative values in message_group_id_indexes vector is the same as length of message_group_ids vector. - // Example: - // max_meta_cache_size : 2 - // Cache indexes : 1 2 - // Cache state before processing : s0,? // ? means not set yet. - // - // message_group_ids : s1 s2 s3 s1 - // message_group_id_indexes : -1 -2 1 2 1 1 -1 2 -2 - // cache state : s1,? s1,s2 s1,s2 s1,s2 s1,s2 s1,s2 s3,s2 s3,s2 s3,s1 - // real message group ids : s1 s2 s1 s2 s1 s1 s3 s2 s1 - // Cache indexes : 1 2 - // Cache state after processing : s3,s1 - repeated sint64 message_group_id_indexes = 7; - - // New ips for updating ip cache. - repeated string ips = 8; - // Same as message_group_id_indexes but for ips. - repeated sint64 ip_indexes = 9; - - // New session meta datas for updating cache. - repeated SessionMetaValue message_session_meta = 10; - // Same as message_group_id_indexes but for session meta data. - repeated sint64 message_session_meta_indexes = 11; - - // Client messages sizes. - // Same size as offsets. - repeated int64 message_sizes = 12; - - // Block must contain whole client message when it's size is not bigger that max_block_size. - // If message is bigger than max_block_size - it will be transferred as SIZE/max_block_size blocks. All of this blocks will be with block_count = 0 but not the last one - last one's block_count will be 0; - // Blocks can be reordered upto provided by client uncompressed free buffer size. - // blocks: A A A B B B CDE - // offset: 1 1 1 4 4 4 6 - // part_number: 0 1 2 0 1 2 0 - // count: 0 0 1 0 0 1 3 - // Offset will be the same as in Offsets. - repeated int64 blocks_offsets = 13; - repeated int64 blocks_part_numbers = 14; - // How many complete messages and imcomplete messages end parts (one at most) this block contains - repeated int64 blocks_message_counts = 15; - repeated int64 blocks_uncompressed_sizes = 16; - // In block format version 0 each byte contains only block codec identifier - repeated bytes blocks_headers = 17; - repeated bytes blocks_data = 18; - - // Zero if this is not first portion of data after resume or provided by client cookie otherwise. - int64 resume_cookie = 50; - - message ReadStatistics { - int64 blobs_from_cache = 1; - int64 blobs_from_disk = 2; - int64 bytes_from_head = 3; - int64 bytes_from_cache = 4; - int64 bytes_from_disk = 5; - int64 repack_duration_ms = 6; - } - ReadStatistics read_statistics = 100; + message MessageData { + // Partition offset in partition that assigned for message. + int64 offset = 1; //unique value for clientside deduplication - Topic:Cluster:Partition:Offset + // Sequence number that provided with message on write from client. + int64 seq_no = 2; + // Timestamp of creation of message provided on write from client. + int64 create_timestamp_ms = 3; + // Codec that is used for data compressing. + Codec codec = 4; + // Compressed client message body. + bytes data = 5; + // Uncompressed size of client message body. + int64 uncompressed_size = 6; + // kinesis data + string partition_key = 7; + bytes explicit_hash = 8; } - message SkipRange { - // Partition Stream identifier. - int64 partition_stream_id = 1; + // Representation of sequence of client messages from one write session. + message Batch { + // Source identifier provided by client for this batch of client messages. + bytes message_group_id = 2; + // Client metadata attached to write session, the same for all messages in batch. + MetaValue session_meta = 3; + // Persist timestamp on server for batch. + int64 write_timestamp_ms = 4; + // Peer address of node that created write session. + string ip = 5; - // When some data is skipped by client parameters (read_timestamp_ms for example) then range of skipped offsets is sended to client. - // Client lib must commit this range and change read_offset to end of this range. - OffsetsRange skip_range = 2; + // List of client messages. + repeated MessageData message_data = 1; } - repeated SkipRange skip_range = 1; + // Representation of sequence of messages from one partition. + message PartitionData { + int64 partition_session_id = 1; - // Per-partition data. - repeated PartitionData partitions = 2; + // Client messages, divided by write sessions. + repeated Batch batches = 2; + } + // Client messages, divided by partitions. + repeated PartitionData partition_data = 1; } - // Response for status requst. - message PartitionStreamStatusResponse { - // Identifier of partition partition stream that is ready to be closed by server. - int64 partition_stream_id = 1; + // Response for status request. + message PartitionSessionStatusResponse { + // Identifier of partition partition session that is ready to be closed by server. + int64 partition_session_id = 1; int64 committed_offset = 2; int64 end_offset = 3; @@ -621,7 +589,7 @@ message StreamingReadServerMessageNew { int64 written_at_watermark_ms = 4; } - message StopReadResponse { + message PauseReadResponse { } message ResumeReadResponse { @@ -664,7 +632,6 @@ message PartitionStream { * Start_read - signal for server that client is ready to get data from partition. * Released - signal for server that client finished working with partition. Must be sent only after corresponding Release request from server. */ - message MigrationStreamingReadClientMessage { message TopicReadSettings { // Topic path. @@ -824,8 +791,6 @@ message MigrationStreamingReadClientMessage { bytes token = 20; } - - /** * Response for read session. Contains one of : * Inited - handshake response from server. @@ -834,7 +799,6 @@ message MigrationStreamingReadClientMessage { * Assigned - signal from server for assigning of partition. * Release - signal from server for releasing of partition. */ - message MigrationStreamingReadServerMessage { // Handshake response. message InitResponse { @@ -986,7 +950,6 @@ message MigrationStreamingReadServerMessage { /** * Reading information request sent from client to server. */ - message ReadInfoRequest { Ydb.Operations.OperationParams operation_params = 1; // List of topics that are beeing read. @@ -1001,7 +964,6 @@ message ReadInfoRequest { /** * Reading information response sent from server to client. */ - message ReadInfoResponse { // Result of request will be inside operation. Ydb.Operations.Operation operation = 1; @@ -1010,7 +972,6 @@ message ReadInfoResponse { /** * Reading information message that will be inside ReadInfoResponse.operation. */ - message ReadInfoResult { // Message containing information about concrete topic reading. message TopicInfo { @@ -1079,11 +1040,9 @@ message ReadInfoResult { repeated TopicInfo topics = 1; } - /** * Drop topic request sent from client to server. */ - message DropTopicRequest { Ydb.Operations.OperationParams operation_params = 1; // Topic path. @@ -1094,7 +1053,6 @@ message DropTopicRequest { /** * Drop topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR". */ - message DropTopicResponse { // Result of request will be inside operation. Ydb.Operations.Operation operation = 1; @@ -1103,14 +1061,12 @@ message DropTopicResponse { /** * Drop topic result message that will be inside DropTopicResponse.operation. */ - message DropTopicResult { } /** * Credentials settings */ - message Credentials { message Iam { string endpoint = 1; @@ -1126,7 +1082,6 @@ message Credentials { /** * Message for describing topic internals. */ - message TopicSettings { enum Format { FORMAT_UNSPECIFIED = 0; @@ -1212,7 +1167,6 @@ message TopicSettings { /** * Create topic request sent from client to server. */ - message CreateTopicRequest { Ydb.Operations.OperationParams operation_params = 1; // Topic path. @@ -1225,7 +1179,6 @@ message CreateTopicRequest { /** * Create topic response sent from server to client. If topic is already exists then response status will be "ALREADY_EXISTS". */ - message CreateTopicResponse { // Result of request will be inside operation. Ydb.Operations.Operation operation = 1; @@ -1234,14 +1187,12 @@ message CreateTopicResponse { /** * Create topic result message that will be inside CreateTopicResponse.operation. */ - message CreateTopicResult { } /** * Update existing topic request sent from client to server. */ - message AlterTopicRequest { Ydb.Operations.OperationParams operation_params = 1; // Topic path. @@ -1250,11 +1201,9 @@ message AlterTopicRequest { TopicSettings settings = 4; } - /** * Update topic response sent from server to client. */ - message AlterTopicResponse { // Result of request will be inside operation. Ydb.Operations.Operation operation = 1; @@ -1291,7 +1240,6 @@ message AddReadRuleResponse { message AddReadRuleResult { } - /** * Remove read rules request for existing topic. */ @@ -1317,22 +1265,18 @@ message RemoveReadRuleResponse { message RemoveReadRuleResult { } - /** * Describe topic request sent from client to server. */ - message DescribeTopicRequest { Ydb.Operations.OperationParams operation_params = 1; // Topic path. string path = 2 [(required) = true]; } - /** * Describe topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR". */ - message DescribeTopicResponse { // Result of request will be inside operation. Ydb.Operations.Operation operation = 1; @@ -1341,7 +1285,6 @@ message DescribeTopicResponse { /** * Describe topic result message that will be inside DescribeTopicResponse.operation. */ - message DescribeTopicResult { // Topic path. Ydb.Scheme.Entry self = 1; @@ -1349,341 +1292,3 @@ message DescribeTopicResult { TopicSettings settings = 2; } - -/////////////////////////////////////////////////////////// - -message StreamingReadClientMessage { - /** - * Message that is used for describing commit. - */ - message PartitionCommit { - // Identifier of partition stream with data to commit. - int64 partition_stream_id = 1; - // Processed ranges. - repeated OffsetsRange offsets = 2; - } - - message TopicReadSettings { - // Topic path. - string topic = 1; - // Partition groups that will be read by this session. - // If list is empty - then session will read all partition groups. - repeated int64 partition_group_ids = 2; - // Read data only after this timestamp from this topic. - int64 start_from_written_at_ms = 3; - } - - // Handshake request. - message InitRequest { - // Message that describes topic to read. - // Topics that will be read by this session. - repeated TopicReadSettings topics_read_settings = 1; - // Flag that indicates reading only of original topics in cluster or all including mirrored. - bool read_only_original = 2; - // Path of consumer that is used for reading by this session. - string consumer = 3; - - // Skip all messages that has write timestamp smaller than now - max_time_lag_ms. - int64 max_lag_duration_ms = 4; - // Read data only after this timestamp from all topics. - int64 start_from_written_at_ms = 5; - - // Maximum block format version supported by the client. Server will asses this parameter and return actual data blocks version in - // StreamingReadServerMessage.InitResponse.block_format_version_by_topic (and StreamingReadServerMessage.AddTopicResponse.block_format_version) - // or error if client will not be able to read data. - int64 max_supported_block_format_version = 6; - - // Maximal size of client cache for message_group_id, ip and meta, per partition. - // There is separate caches for each partition partition streams. - // There is separate caches for message group identifiers, ip and meta inside one partition partition stream. - int64 max_meta_cache_size = 10; - - // State of client read session. Could be provided to server for retries. - message State { - message PartitionStreamState { - enum Status { - // Not used state. - STATUS_UNSPECIFIED = 0; - // Client seen Create message but not yet responded to server with Created message. - CREATING = 1; - // Client seen Destroy message but not yet responded with Released message. - DESTROYING = 2; - // Client sent Created or ResumeReadRequest message to server for this partition stream. - READING = 3; - // Client sent StopReadRequest for this partition stream. - STOPPED = 4; - } - // Partition partition stream. - PartitionStream partition_stream = 1; - // Current read offset if has one. Actual for states DESTROYING, READING and STOPPED. - int64 read_offset = 2; - // Ranges of committed by client offsets. - repeated OffsetsRange offset_ranges = 3; - // Status of partition stream. - Status status = 4; - } - repeated PartitionStreamState partition_streams_states = 1; - } - - // Session identifier for retries. Must be the same as session_id from Inited server response. If this is first connect, not retry - do not use this field. - string session_id = 100; - // 0 for first init message and incremental value for connect retries. - int64 connection_attempt = 101; - // Formed state for retries. If not retry - do not use this field. - State state = 102; - - int64 idle_timeout_ms = 200; - - - //////////////////////////////////////////////////////////////////////////////////////////////////////////// - // TODO: remove after adding BatchReadResponse - // Single read request params. - ReadParams read_params = 42; - } - - // TODO: add topics/groups and remove them from reading - - // Message that represents client readiness for receiving more data. - message ReadRequest { - // Client acquired this amount of free bytes more for buffer. Server can send more data at most of this uncompressed size. - // Subsequent messages with 5 and 10 request_uncompressed_size are treated by server that it can send messages for at most 15 bytes. - int64 request_uncompressed_size = 1; - } - - // Signal for server that cient is ready to recive data for partition. - message CreatePartitionStreamResponse { - // Partition stream identifier of partition to start read. - int64 partition_stream_id = 1; - - // Start reading from partition offset that is not less than read_offset. - // Init.max_time_lag_ms and Init.read_timestamp_ms could lead to skip of more messages. - // The same with actual committed offset. Regardless of set read_offset server will return data from maximal offset from read_offset, actual committed offset - // and offsets calculated from Init.max_time_lag_ms and Init.read_timestamp_ms. - int64 read_offset = 2; - // All messages with offset less than commit_offset are processed by client. Server will commit this position if this is not done yet. - int64 commit_offset = 3; - - // This option will enable sanity check on server for read_offset. Server will verify that read_offset is no less that actual committed offset. - // If verification will fail then server will kill this read session and client will find out error in reading logic. - // If client is not setting read_offset, sanity check will fail so do not set verify_read_offset if you not setting correct read_offset. - bool verify_read_offset = 4; - - } - - // Signal for server that client processed some read data. - message CommitRequest { - // Partition offsets that indicates processed data. - repeated PartitionCommit commits = 1; - } - - // Signal for server that client finished working with this partition. Must be sent only after corresponding Release request from server. - // Server will give this partition to other read session only after Released signal. - message DestroyPartitionStreamResponse { - // Partition stream identifier of partition partition stream that is released by client. - int64 partition_stream_id = 1; - } - - // Signal for server that client is not ready to recieve more data from this partition. - message StopReadRequest { - repeated int64 partition_stream_ids = 1; - } - - // Signal for server that client is ready to receive more data from this partition. - message ResumeReadRequest { - repeated int64 partition_stream_ids = 1; - - // Offset to start reading - may be smaller than known one in case of dropping of read-ahead in client lib. - repeated int64 read_offsets = 2; - - // Cookie for matching data from PartitionStream after resuming. Must be greater than zero. - repeated int64 resume_cookies = 3; - } - - message PartitionStreamStatusRequest { - int64 partition_stream_id = 1; - } - - // Add topic to current read session - message AddTopicRequest { - TopicReadSettings topic_read_settings = 1; - } - - // Remove topic from current read session - message RemoveTopicRequest { - string topic = 1; - } - - oneof client_message { - InitRequest init_request = 1; - ReadRequest read_request = 2; - CreatePartitionStreamResponse create_partition_stream_response = 3; - CommitRequest commit_request = 4; - DestroyPartitionStreamResponse destroy_partition_stream_response = 5; - StopReadRequest stop_read_request = 6; - ResumeReadRequest resume_read_request = 7; - PartitionStreamStatusRequest partition_stream_status_request = 8; - AddTopicRequest add_topic_request = 9; - RemoveTopicRequest remove_topic_request = 10; - UpdateTokenRequest update_token_request = 11; - } -} - -message StreamingReadServerMessage { - // Handshake response. - message InitResponse { - // Read session identifier for debug purposes. - string session_id = 1; - // Block format version of data client will receive from topics. - map<string, int64> block_format_version_by_topic = 2; - - // Choosed maximal cache size by server. - // Client must use cache of this size. Could change on retries - reduce size of cache in this case. - int64 max_meta_cache_size = 10; - } - - // Readed data. - message DataBatch { - // One client message representation. - message MessageData { - // Partition offset in partition that assigned for message. - uint64 offset = 1; //unique value for clientside deduplication - Topic:Cluster:Partition:Offset - // Sequence number that provided with message on write from client. - uint64 seq_no = 2; - // Timestamp of creation of message provided on write from client. - uint64 create_timestamp_ms = 3; - // Codec that is used for data compressing. - Codec codec = 4; - // Compressed client message body. - bytes data = 5; - // Uncompressed size of client message body. - uint64 uncompressed_size = 6; - // kinesis data - string partition_key = 7; - bytes explicit_hash = 8; - } - - // Representation of sequence of client messages from one write session. - message Batch { - // Source identifier provided by client for this batch of client messages. - bytes source_id = 2; - // Client metadata attached to write session, the same for all messages in batch. - repeated KeyValue extra_fields = 3; - // Persist timestamp on server for batch. - uint64 write_timestamp_ms = 4; - // Peer address of node that created write session. - string ip = 5; - - // List of client messages. - repeated MessageData message_data = 1; - } - - // Representation of sequence of messages from one partition. - message PartitionData { - // Partition's topic path. - Path topic = 1; - // Topic's instance cluster name. - string cluster = 2; - // Partition identifier. topic:cluster:partition is unique addressing for partition. - uint64 partition = 3; - - // Client messages, divided by write sessions. - repeated Batch batches = 4; - - // Cookie for addressing this partition messages batch for committing. - CommitCookie cookie = 5; - - // Old formatted topic name with cluster inside. - string deprecated_topic = 10; - } - - // Client messages, divided by partitions. - repeated PartitionData partition_data = 1; - } - - // Command to create a partition partition stream. - // Client must react on this signal by sending StartRead when ready recieve data from this partition. - message CreatePartitionStreamRequest { - // Partition partition stream description. - PartitionStream partition_stream = 1; - - // Actual committed offset. - int64 committed_offset = 2; - // Offset of first not existing message in partition till now. - int64 end_offset = 3; - - } - - // Command to destroy concrete partition stream. - message DestroyPartitionStreamRequest { - // Identifier of partition partition stream that is ready to be closed by server. - int64 partition_stream_id = 1; - - // Flag of gracefull or not destroy. - // If True then server is waiting for Destroyed signal from client before giving of this partition for other read session. - // Server will not send more data from this partition. - // Client can process all received data and wait for commit and only after send Destroyed signal. - // If False then server gives partition for other session right now. - // All futher commits for this PartitionStream has no effect. Server is not waiting for Destroyed signal. - bool graceful = 2; - - // Last known committed offset. - int64 committed_offset = 3; - } - - // Acknowledgement for commits. - message CommitResponse { - // Per-partition commit representation. - message PartitionCommittedOffset { - // Partition partition stream identifier. - int64 partition_stream_id = 1; - // Last committed offset. - int64 committed_offset = 2; - } - // Partitions with progress. - repeated PartitionCommittedOffset partitions_committed_offsets = 1; - } - - // Response for status requst. - message PartitionStreamStatusResponse { - // Identifier of partition partition stream that is ready to be closed by server. - int64 partition_stream_id = 1; - - int64 committed_offset = 2; - int64 end_offset = 3; - - // WriteTimestamp of next message (and end_offset) will be not less that WriteWatermarkMs. - int64 written_at_watermark_ms = 4; - } - - message StopReadResponse { - } - - message ResumeReadResponse { - } - - message AddTopicResponse { - // Block format version of data client will receive from the topic. - int64 block_format_version = 1; - } - - message RemoveTopicResponse { - } - - Ydb.StatusIds.StatusCode status = 1; - - repeated Ydb.Issue.IssueMessage issues = 2; - - oneof server_message { - InitResponse init_response = 3; - DataBatch data_batch = 4; - CreatePartitionStreamRequest create_partition_stream_request = 5; - DestroyPartitionStreamRequest destroy_partition_stream_request = 6; - CommitResponse commit_response = 7; - PartitionStreamStatusResponse partition_stream_status_response = 8; - StopReadResponse stop_read_response = 9; - ResumeReadResponse resume_read_response = 10; - AddTopicResponse add_topic_response = 11; - RemoveTopicResponse remove_topic_response = 12; - UpdateTokenResponse update_token_response = 13; - } -} diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp index fab21d7617f..ea771e701c5 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp @@ -543,7 +543,7 @@ void TWriteSession::InitImpl() { if (Settings.PartitionGroupId_) { init->set_partition_group_id(*Settings.PartitionGroupId_); } - init->set_max_supported_block_format_version(0); + init->set_max_supported_format_version(0); init->set_preferred_cluster(PreferredClusterByCDS); for (const auto& attr : Settings.Meta_.Fields) { diff --git a/ydb/services/persqueue_v1/actors/helpers.cpp b/ydb/services/persqueue_v1/actors/helpers.cpp index 46adacd6ff1..779cf312c13 100644 --- a/ydb/services/persqueue_v1/actors/helpers.cpp +++ b/ydb/services/persqueue_v1/actors/helpers.cpp @@ -17,11 +17,11 @@ bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::Dat } // TODO: remove after refactor -bool RemoveEmptyMessages(PersQueue::V1::StreamingReadServerMessage::DataBatch& data) { - auto batchRemover = [&](PersQueue::V1::StreamingReadServerMessage::DataBatch::Batch& batch) -> bool { +bool RemoveEmptyMessages(PersQueue::V1::StreamingReadServerMessage::ReadResponse& data) { + auto batchRemover = [&](PersQueue::V1::StreamingReadServerMessage::ReadResponse::Batch& batch) -> bool { return batch.message_data_size() == 0; }; - auto partitionDataRemover = [&](PersQueue::V1::StreamingReadServerMessage::DataBatch::PartitionData& partition) -> bool { + auto partitionDataRemover = [&](PersQueue::V1::StreamingReadServerMessage::ReadResponse::PartitionData& partition) -> bool { NProtoBuf::RemoveRepeatedFieldItemIf(partition.mutable_batches(), batchRemover); return partition.batches_size() == 0; }; diff --git a/ydb/services/persqueue_v1/actors/helpers.h b/ydb/services/persqueue_v1/actors/helpers.h index ed1b243962a..1a46d93a308 100644 --- a/ydb/services/persqueue_v1/actors/helpers.h +++ b/ydb/services/persqueue_v1/actors/helpers.h @@ -9,6 +9,6 @@ using namespace Ydb; bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data); // TODO: remove after grpc refactor -bool RemoveEmptyMessages(PersQueue::V1::StreamingReadServerMessage::DataBatch& data); +bool RemoveEmptyMessages(PersQueue::V1::StreamingReadServerMessage::ReadResponse& data); } diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp index f37fc307963..7139e22936a 100644 --- a/ydb/services/persqueue_v1/actors/partition_actor.cpp +++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp @@ -254,25 +254,62 @@ void TPartitionActor::Handle(const TEvPQProxy::TEvRestartPipe::TPtr&, const TAct } } -// TODO: keep only Migration version -template<typename TServerMessage> +TString GetBatchSourceId(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch* batch) { + Y_VERIFY(batch); + return batch->source_id(); +} + +TString GetBatchSourceId(PersQueue::V1::StreamingReadServerMessage::ReadResponse::Batch* batch) { + Y_VERIFY(batch); + return batch->message_group_id(); +} + +void SetBatchSourceId(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch* batch, TString value) { + Y_VERIFY(batch); + batch->set_source_id(std::move(value)); +} + +void SetBatchSourceId(PersQueue::V1::StreamingReadServerMessage::ReadResponse::Batch* batch, TString value) { + Y_VERIFY(batch); + batch->set_message_group_id(std::move(value)); +} + +void SetBatchExtraField(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch* batch, TString key, TString value) { + Y_VERIFY(batch); + auto* item = batch->add_extra_fields(); + item->set_key(std::move(key)); + item->set_value(std::move(value)); +} + +void SetBatchExtraField(PersQueue::V1::StreamingReadServerMessage::ReadResponse::Batch* batch, TString key, TString value) { + Y_VERIFY(batch); + (*batch->mutable_session_meta()->mutable_value())[key] = std::move(value); +} + +template<typename TReadResponse> bool FillBatchedData( - typename TServerMessage::DataBatch * data, const NKikimrClient::TCmdReadResult& res, + TReadResponse* data, const NKikimrClient::TCmdReadResult& res, const TPartitionId& Partition, ui64 ReadIdToResponse, ui64& ReadOffset, ui64& WTime, ui64 EndOffset, const NPersQueue::TTopicConverterPtr& topic, const TActorContext& ctx) { - + constexpr bool UseMigrationProtocol = std::is_same_v<TReadResponse, PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch>; auto* partitionData = data->add_partition_data(); - partitionData->mutable_topic()->set_path(topic->GetFederationPath()); - partitionData->set_cluster(topic->GetCluster()); - partitionData->set_partition(Partition.Partition); - partitionData->set_deprecated_topic(topic->GetClientsideName()); - partitionData->mutable_cookie()->set_assign_id(Partition.AssignId); - partitionData->mutable_cookie()->set_partition_cookie(ReadIdToResponse); + + if constexpr (UseMigrationProtocol) { + partitionData->mutable_topic()->set_path(topic->GetFederationPath()); + partitionData->set_cluster(topic->GetCluster()); + partitionData->set_partition(Partition.Partition); + partitionData->set_deprecated_topic(topic->GetClientsideName()); + partitionData->mutable_cookie()->set_assign_id(Partition.AssignId); + partitionData->mutable_cookie()->set_partition_cookie(ReadIdToResponse); + + } else { + partitionData->set_partition_session_id(Partition.AssignId); + } bool hasOffset = false; bool hasData = false; - typename TServerMessage::DataBatch::Batch* currentBatch = nullptr; + typename TReadResponse::Batch* currentBatch = nullptr; for (ui32 i = 0; i < res.ResultSize(); ++i) { const auto& r = res.GetResult(i); WTime = r.GetWriteTimestampMS(); @@ -294,42 +331,34 @@ bool FillBatchedData( sourceId = NPQ::NSourceIdEncoding::Decode(r.GetSourceId()); } - if (!currentBatch || currentBatch->write_timestamp_ms() != r.GetWriteTimestampMS() || currentBatch->source_id() != sourceId) { + if (!currentBatch + || static_cast<i64>(currentBatch->write_timestamp_ms()) != static_cast<i64>(r.GetWriteTimestampMS()) + || GetBatchSourceId(currentBatch) != sourceId) { // If write time and source id are the same, the rest fields will be the same too. currentBatch = partitionData->add_batches(); currentBatch->set_write_timestamp_ms(r.GetWriteTimestampMS()); - currentBatch->set_source_id(sourceId); + SetBatchSourceId(currentBatch, std::move(sourceId)); if (proto.HasMeta()) { const auto& header = proto.GetMeta(); if (header.HasServer()) { - auto* item = currentBatch->add_extra_fields(); - item->set_key("server"); - item->set_value(header.GetServer()); + SetBatchExtraField(currentBatch, "server", header.GetServer()); } if (header.HasFile()) { - auto* item = currentBatch->add_extra_fields(); - item->set_key("file"); - item->set_value(header.GetFile()); + SetBatchExtraField(currentBatch, "file", header.GetFile()); } if (header.HasIdent()) { - auto* item = currentBatch->add_extra_fields(); - item->set_key("ident"); - item->set_value(header.GetIdent()); + SetBatchExtraField(currentBatch, "ident", header.GetIdent()); } if (header.HasLogType()) { - auto* item = currentBatch->add_extra_fields(); - item->set_key("logtype"); - item->set_value(header.GetLogType()); + SetBatchExtraField(currentBatch, "logtype", header.GetLogType()); } } if (proto.HasExtraFields()) { const auto& map = proto.GetExtraFields(); for (const auto& kv : map.GetItems()) { - auto* item = currentBatch->add_extra_fields(); - item->set_key(kv.GetKey()); - item->set_value(kv.GetValue()); + SetBatchExtraField(currentBatch, kv.GetKey(), kv.GetValue()); } } @@ -506,11 +535,11 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo bool hasData = false; if (UseMigrationProtocol) { - auto* data = migrationResponse.mutable_data_batch(); - hasData = FillBatchedData<MigrationStreamingReadServerMessage>(data, res, Partition, ReadIdToResponse, ReadOffset, WTime, EndOffset, Topic, ctx); + typename MigrationStreamingReadServerMessage::DataBatch* data = migrationResponse.mutable_data_batch(); + hasData = FillBatchedData<MigrationStreamingReadServerMessage::DataBatch>(data, res, Partition, ReadIdToResponse, ReadOffset, WTime, EndOffset, Topic, ctx); } else { - auto* data = response.mutable_data_batch(); - hasData = FillBatchedData<StreamingReadServerMessage>(data, res, Partition, ReadIdToResponse, ReadOffset, WTime, EndOffset, Topic, ctx); + StreamingReadServerMessage::ReadResponse* data = response.mutable_read_response(); + hasData = FillBatchedData<StreamingReadServerMessage::ReadResponse>(data, res, Partition, ReadIdToResponse, ReadOffset, WTime, EndOffset, Topic, ctx); } WriteTimestampEstimateMs = Max(WriteTimestampEstimateMs, WTime); diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 2cbc42deebb..a329268ae85 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -125,7 +125,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF if constexpr (UseMigrationProtocol) { return request.assign_id(); } else { - return request.partition_stream_id(); + return request.partition_session_id(); } }(); Y_VERIFY(Partitions.find(id) != Partitions.end()); @@ -255,8 +255,8 @@ if (!partId.DiscoveryConverter->IsValid()) { \ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead()); // Proto read message have no parameters break; } - case TClientMessage::kPartitionStreamStatusRequest: { - GET_PART_ID_OR_EXIT(request.partition_stream_status_request()); + case TClientMessage::kPartitionSessionStatusRequest: { + GET_PART_ID_OR_EXIT(request.partition_session_status_request()); ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(partId)); if (!Request->GetStreamCtx()->Read()) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); @@ -266,8 +266,8 @@ if (!partId.DiscoveryConverter->IsValid()) { \ break; } - case TClientMessage::kDestroyPartitionStreamResponse: { - GET_PART_ID_OR_EXIT(request.destroy_partition_stream_response()); + case TClientMessage::kStopPartitionSessionResponse: { + GET_PART_ID_OR_EXIT(request.stop_partition_session_response()); ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(partId)); if (!Request->GetStreamCtx()->Read()) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); @@ -277,14 +277,14 @@ if (!partId.DiscoveryConverter->IsValid()) { \ break; } - case TClientMessage::kCreatePartitionStreamResponse: { - const auto& req = request.create_partition_stream_response(); + case TClientMessage::kStartPartitionSessionResponse: { + const auto& req = request.start_partition_session_response(); const ui64 readOffset = req.read_offset(); const ui64 commitOffset = req.commit_offset(); const bool verifyReadOffset = req.verify_read_offset(); - GET_PART_ID_OR_EXIT(request.create_partition_stream_response()); + GET_PART_ID_OR_EXIT(request.start_partition_session_response()); ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(partId, readOffset, commitOffset, verifyReadOffset)); if (!Request->GetStreamCtx()->Read()) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); @@ -304,7 +304,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \ THashMap<ui64, TEvPQProxy::TCommitRange> commitRange; for (auto& pc: req.commits()) { - auto id = pc.partition_stream_id(); + auto id = pc.partition_session_id(); for (auto& c: pc.offsets()) { commitRange[id].Ranges.push_back(std::make_pair(c.start_offset(), c.end_offset())); } @@ -579,7 +579,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvCommitDone:: } else { auto c = result.mutable_commit_response()->add_partitions_committed_offsets(); - c->set_partition_stream_id(assignId); + c->set_partition_session_id(assignId); c->set_committed_offset(ev->Get()->Offset); } } @@ -660,10 +660,8 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr& MaxReadSize = NormalizeMaxReadSize(init.read_params().max_read_size()); } else { RangesMode = true; - // MaxReadMessagesCount = NormalizeMaxReadMessagesCount(0); - // MaxReadSize = NormalizeMaxReadSize(0); - MaxReadMessagesCount = NormalizeMaxReadMessagesCount(init.read_params().max_read_messages_count()); - MaxReadSize = NormalizeMaxReadSize(init.read_params().max_read_size()); + MaxReadMessagesCount = NormalizeMaxReadMessagesCount(0); + MaxReadSize = NormalizeMaxReadSize(0); } if (init.max_lag_duration_ms() < 0) { CloseSession("max_lag_duration_ms must be nonnegative number", PersQueue::ErrorCode::BAD_REQUEST, ctx); @@ -1069,13 +1067,13 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta result.mutable_assigned()->set_end_offset(ev->Get()->EndOffset); } else { - result.mutable_create_partition_stream_request()->mutable_partition_stream()->set_topic(it->second.Topic->GetFederationPath()); - result.mutable_create_partition_stream_request()->mutable_partition_stream()->set_cluster(it->second.Topic->GetCluster()); - result.mutable_create_partition_stream_request()->mutable_partition_stream()->set_partition_id(ev->Get()->Partition.Partition); - result.mutable_create_partition_stream_request()->mutable_partition_stream()->set_partition_stream_id(it->first); + result.mutable_start_partition_session_request()->mutable_partition_session()->set_topic(it->second.Topic->GetFederationPath()); + result.mutable_start_partition_session_request()->mutable_partition_session()->set_cluster(it->second.Topic->GetCluster()); + result.mutable_start_partition_session_request()->mutable_partition_session()->set_partition_id(ev->Get()->Partition.Partition); + result.mutable_start_partition_session_request()->mutable_partition_session()->set_partition_session_id(it->first); - result.mutable_create_partition_stream_request()->set_committed_offset(ev->Get()->Offset); - result.mutable_create_partition_stream_request()->set_end_offset(ev->Get()->EndOffset); + result.mutable_start_partition_session_request()->set_committed_offset(ev->Get()->Offset); + result.mutable_start_partition_session_request()->set_end_offset(ev->Get()->EndOffset); } LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " sending to client create partition stream event"); @@ -1110,11 +1108,11 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta result.mutable_partition_status()->set_write_watermark_ms(ev->Get()->WriteTimestampEstimateMs); } else { - result.mutable_partition_stream_status_response()->set_partition_stream_id(it->first); + result.mutable_partition_session_status_response()->set_partition_session_id(it->first); - result.mutable_partition_stream_status_response()->set_committed_offset(ev->Get()->Offset); - result.mutable_partition_stream_status_response()->set_end_offset(ev->Get()->EndOffset); - result.mutable_partition_stream_status_response()->set_written_at_watermark_ms(ev->Get()->WriteTimestampEstimateMs); + result.mutable_partition_session_status_response()->set_committed_offset(ev->Get()->Offset); + result.mutable_partition_session_status_response()->set_end_offset(ev->Get()->EndOffset); + result.mutable_partition_session_status_response()->set_written_at_watermark_ms(ev->Get()->WriteTimestampEstimateMs); } auto pp = it->second.Partition; @@ -1154,9 +1152,9 @@ void TReadSessionActor<UseMigrationProtocol>::SendReleaseSignalToClient(const ty result.mutable_release()->set_commit_offset(it->second.Offset); } else { - result.mutable_destroy_partition_stream_request()->set_partition_stream_id(it->second.Partition.AssignId); - result.mutable_destroy_partition_stream_request()->set_graceful(!kill); - result.mutable_destroy_partition_stream_request()->set_committed_offset(it->second.Offset); + result.mutable_stop_partition_session_request()->set_partition_session_id(it->second.Partition.AssignId); + result.mutable_stop_partition_session_request()->set_graceful(!kill); + result.mutable_stop_partition_session_request()->set_committed_offset(it->second.Offset); } auto pp = it->second.Partition; @@ -1478,10 +1476,18 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvRead::TPtr& template<typename TServerMessage> i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) { - Y_VERIFY(resp.data_batch().partition_data_size() == 1); + constexpr bool UseMigrationProtocol = std::is_same_v<TServerMessage, PersQueue::V1::MigrationStreamingReadServerMessage>; + if constexpr (UseMigrationProtocol) { + Y_VERIFY(resp.data_batch().partition_data_size() == 1); + Response.mutable_data_batch()->add_partition_data()->Swap(resp.mutable_data_batch()->mutable_partition_data(0)); + + } else { + Y_VERIFY(resp.read_response().partition_data_size() == 1); + Response.mutable_read_response()->add_partition_data()->Swap(resp.mutable_read_response()->mutable_partition_data(0)); + } + Response.set_status(Ydb::StatusIds::SUCCESS); - Response.mutable_data_batch()->add_partition_data()->Swap(resp.mutable_data_batch()->mutable_partition_data(0)); i64 prev = Response.ByteSize(); std::swap<i64>(prev, ByteSize); return ByteSize - prev; @@ -1495,16 +1501,27 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadResponse::T THolder<TEvReadResponse> event(ev->Release()); - Y_VERIFY(event->Response.data_batch().partition_data_size() == 1); - const ui64 partitionCookie = event->Response.data_batch().partition_data(0).cookie().partition_cookie(); - Y_VERIFY(partitionCookie != 0); // cookie is assigned - const ui64 assignId = event->Response.data_batch().partition_data(0).cookie().assign_id(); + ui64 partitionCookie; + ui64 assignId; + if constexpr (UseMigrationProtocol) { + Y_VERIFY(event->Response.data_batch().partition_data_size() == 1); + partitionCookie = event->Response.data_batch().partition_data(0).cookie().partition_cookie(); + Y_VERIFY(partitionCookie != 0); // cookie is assigned + assignId = event->Response.data_batch().partition_data(0).cookie().assign_id(); + + } else { + Y_VERIFY(event->Response.read_response().partition_data_size() == 1); + assignId = event->Response.read_response().partition_data(0).partition_session_id(); + } + const auto partitionIt = Partitions.find(assignId); Y_VERIFY(partitionIt != Partitions.end()); Y_VERIFY(partitionIt->second.Reading); partitionIt->second.Reading = false; - partitionIt->second.ReadIdToResponse = partitionCookie + 1; + if constexpr (UseMigrationProtocol) { + partitionIt->second.ReadIdToResponse = partitionCookie + 1; + } auto it = PartitionToReadResponse.find(sender); Y_VERIFY(it != PartitionToReadResponse.end()); @@ -1558,7 +1575,13 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext& Y_VERIFY(formedResponse->RequestsInfly == 0); const ui64 diff = formedResponse->Response.ByteSize(); - const bool hasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch()); + bool hasMessages; + if constexpr(UseMigrationProtocol) { + hasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch()); + } else { + hasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response()); + } + if (hasMessages) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " response to read " << formedResponse->Guid); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 9fedc4d6627..93b846a20c2 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -359,7 +359,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { req.mutable_init_request()->set_consumer("user"); req.mutable_init_request()->set_read_only_original(true); - req.mutable_init_request()->mutable_read_params()->set_max_read_messages_count(1); if (!readStream->Write(req)) { ythrow yexception() << "write fail"; @@ -369,7 +368,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kInitResponse); //send some reads req.Clear(); - req.mutable_read_request(); + req.mutable_read_request()->set_request_uncompressed_size(256); for (ui32 i = 0; i < 10; ++i) { if (!readStream->Write(req)) { ythrow yexception() << "write fail"; @@ -387,31 +386,31 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { TVector<i64> partition_ids; //lock partition UNIT_ASSERT(readStream->Read(&resp)); - UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kCreatePartitionStreamRequest); - UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().topic() == "acc/topic1"); - UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().cluster() == "dc1"); - partition_ids.push_back(resp.create_partition_stream_request().partition_stream().partition_id()); + UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStartPartitionSessionRequest); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().topic() == "acc/topic1"); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().cluster() == "dc1"); + partition_ids.push_back(resp.start_partition_session_request().partition_session().partition_id()); - assignId = resp.create_partition_stream_request().partition_stream().partition_stream_id(); + assignId = resp.start_partition_session_request().partition_session().partition_session_id(); req.Clear(); - req.mutable_create_partition_stream_response()->set_partition_stream_id(assignId); + req.mutable_start_partition_session_response()->set_partition_session_id(assignId); if (!readStream->Write(req)) { ythrow yexception() << "write fail"; } resp.Clear(); UNIT_ASSERT(readStream->Read(&resp)); - UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kCreatePartitionStreamRequest); - UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().topic() == "acc/topic1"); - UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().cluster() == "dc1"); - partition_ids.push_back(resp.create_partition_stream_request().partition_stream().partition_id()); + UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStartPartitionSessionRequest); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().topic() == "acc/topic1"); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().cluster() == "dc1"); + partition_ids.push_back(resp.start_partition_session_request().partition_session().partition_id()); std::sort(partition_ids.begin(), partition_ids.end()); UNIT_ASSERT((partition_ids == TVector<i64>{0, 1})); - assignId = resp.create_partition_stream_request().partition_stream().partition_stream_id(); + assignId = resp.start_partition_session_request().partition_session().partition_session_id(); req.Clear(); - req.mutable_create_partition_stream_response()->set_partition_stream_id(assignId); + req.mutable_start_partition_session_response()->set_partition_session_id(assignId); if (!readStream->Write(req)) { ythrow yexception() << "write fail"; @@ -433,7 +432,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { req.mutable_init_request()->set_consumer("user"); req.mutable_init_request()->set_read_only_original(true); - req.mutable_init_request()->mutable_read_params()->set_max_read_messages_count(1); if (!readStreamSecond->Write(req)) { ythrow yexception() << "write fail"; @@ -453,25 +451,25 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { //lock partition UNIT_ASSERT(readStream->Read(&resp)); Cerr << "=== Got response (expect destroy): " << resp.ShortDebugString() << Endl; - UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kDestroyPartitionStreamRequest); - UNIT_ASSERT(resp.destroy_partition_stream_request().graceful()); - auto stream_id = resp.destroy_partition_stream_request().partition_stream_id(); + UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStopPartitionSessionRequest); + UNIT_ASSERT(resp.stop_partition_session_request().graceful()); + auto stream_id = resp.stop_partition_session_request().partition_session_id(); req.Clear(); - req.mutable_destroy_partition_stream_response()->set_partition_stream_id(stream_id); + req.mutable_stop_partition_session_response()->set_partition_session_id(stream_id); if (!readStream->Write(req)) { ythrow yexception() << "write fail"; } resp.Clear(); UNIT_ASSERT(readStreamSecond->Read(&resp)); Cerr << "=== Got response (expect create): " << resp.ShortDebugString() << Endl; - UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kCreatePartitionStreamRequest); - UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().topic() == "acc/topic1"); - UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().cluster() == "dc1"); + UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStartPartitionSessionRequest); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().topic() == "acc/topic1"); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().cluster() == "dc1"); - assignId = resp.create_partition_stream_request().partition_stream().partition_stream_id(); + assignId = resp.start_partition_session_request().partition_session().partition_session_id(); req.Clear(); - req.mutable_create_partition_stream_response()->set_partition_stream_id(assignId); + req.mutable_start_partition_session_response()->set_partition_session_id(assignId); if (!readStreamSecond->Write(req)) { ythrow yexception() << "write fail"; @@ -488,14 +486,14 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { //lock partition UNIT_ASSERT(readStream->Read(&resp)); Cerr << "=== Got response (expect forceful destroy): " << resp.ShortDebugString() << Endl; - UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kDestroyPartitionStreamRequest); - UNIT_ASSERT(!resp.destroy_partition_stream_request().graceful()); + UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStopPartitionSessionRequest); + UNIT_ASSERT(!resp.stop_partition_session_request().graceful()); resp.Clear(); UNIT_ASSERT(readStreamSecond->Read(&resp)); Cerr << "=== Got response (expect forceful destroy): " << resp.ShortDebugString() << Endl; - UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kDestroyPartitionStreamRequest); - UNIT_ASSERT(!resp.destroy_partition_stream_request().graceful()); + UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStopPartitionSessionRequest); + UNIT_ASSERT(!resp.stop_partition_session_request().graceful()); } } @@ -520,7 +518,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { req.mutable_init_request()->set_consumer("user"); req.mutable_init_request()->set_read_only_original(true); - req.mutable_init_request()->mutable_read_params()->set_max_read_messages_count(1); if (!readStream->Write(req)) { ythrow yexception() << "write fail"; @@ -530,7 +527,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kInitResponse); //send some reads req.Clear(); - req.mutable_read_request(); + req.mutable_read_request()->set_request_uncompressed_size(256); for (ui32 i = 0; i < 10; ++i) { if (!readStream->Write(req)) { ythrow yexception() << "write fail"; @@ -546,16 +543,16 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { //lock partition UNIT_ASSERT(readStream->Read(&resp)); - UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kCreatePartitionStreamRequest); - UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().topic() == "acc/topic1"); - UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().cluster() == "dc1"); - UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().partition_id() == 0); + UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStartPartitionSessionRequest); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().topic() == "acc/topic1"); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().cluster() == "dc1"); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().partition_id() == 0); - assignId = resp.create_partition_stream_request().partition_stream().partition_stream_id(); + assignId = resp.start_partition_session_request().partition_session().partition_session_id(); req.Clear(); - req.mutable_create_partition_stream_response()->set_partition_stream_id(assignId); + req.mutable_start_partition_session_response()->set_partition_session_id(assignId); - req.mutable_create_partition_stream_response()->set_read_offset(10); + req.mutable_start_partition_session_response()->set_read_offset(10); if (!readStream->Write(req)) { ythrow yexception() << "write fail"; } @@ -575,14 +572,15 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { //check read results StreamingReadServerMessage resp; - for (ui32 i = 10; i < 16; ++i) { + for (ui32 i = 10; i < 16; ) { UNIT_ASSERT(readStream->Read(&resp)); Cerr << "Got read response " << resp << "\n"; - UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kDataBatch, resp); - UNIT_ASSERT(resp.data_batch().partition_data_size() == 1); - UNIT_ASSERT(resp.data_batch().partition_data(0).batches_size() == 1); - UNIT_ASSERT(resp.data_batch().partition_data(0).batches(0).message_data_size() == 1); - UNIT_ASSERT(resp.data_batch().partition_data(0).batches(0).message_data(0).offset() == i); + UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kReadResponse, resp); + UNIT_ASSERT(resp.read_response().partition_data_size() == 1); + UNIT_ASSERT(resp.read_response().partition_data(0).batches_size() == 1); + UNIT_ASSERT(resp.read_response().partition_data(0).batches(0).message_data_size() >= 1); + UNIT_ASSERT(resp.read_response().partition_data(0).batches(0).message_data(0).offset() == i); + i += resp.read_response().partition_data(0).batches(0).message_data_size(); } // send commit, await commitDone @@ -591,7 +589,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { StreamingReadServerMessage resp; auto commit = req.mutable_commit_request()->add_commits(); - commit->set_partition_stream_id(assignId); + commit->set_partition_session_id(assignId); auto offsets = commit->add_offsets(); offsets->set_start_offset(0); @@ -603,7 +601,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(readStream->Read(&resp)); UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kCommitResponse, resp); UNIT_ASSERT(resp.commit_response().partitions_committed_offsets_size() == 1); - UNIT_ASSERT(resp.commit_response().partitions_committed_offsets(0).partition_stream_id() == assignId); + UNIT_ASSERT(resp.commit_response().partitions_committed_offsets(0).partition_session_id() == assignId); UNIT_ASSERT(resp.commit_response().partitions_committed_offsets(0).committed_offset() == 13); } @@ -612,17 +610,17 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { StreamingReadClientMessage req; StreamingReadServerMessage resp; - req.mutable_partition_stream_status_request()->set_partition_stream_id(assignId); + req.mutable_partition_session_status_request()->set_partition_session_id(assignId); if (!readStream->Write(req)) { ythrow yexception() << "write fail"; } UNIT_ASSERT(readStream->Read(&resp)); - UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kPartitionStreamStatusResponse, resp); - UNIT_ASSERT(resp.partition_stream_status_response().partition_stream_id() == assignId); - UNIT_ASSERT(resp.partition_stream_status_response().committed_offset() == 13); - UNIT_ASSERT(resp.partition_stream_status_response().end_offset() == 16); - UNIT_ASSERT(resp.partition_stream_status_response().written_at_watermark_ms() > 0); + UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kPartitionSessionStatusResponse, resp); + UNIT_ASSERT(resp.partition_session_status_response().partition_session_id() == assignId); + UNIT_ASSERT(resp.partition_session_status_response().committed_offset() == 13); + UNIT_ASSERT(resp.partition_session_status_response().end_offset() == 16); + UNIT_ASSERT(resp.partition_session_status_response().written_at_watermark_ms() > 0); } // send update token request, await response @@ -956,8 +954,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { req.mutable_init_request()->set_consumer("user"); req.mutable_init_request()->set_read_only_original(true); - req.mutable_init_request()->mutable_read_params()->set_max_read_messages_count(1000); - if (!readStream->Write(req)) { ythrow yexception() << "write fail"; } @@ -971,7 +967,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Sleep(TDuration::Seconds(5)); for (ui32 i = 0; i < 10; ++i) { req.Clear(); - req.mutable_read_request(); + req.mutable_read_request()->set_request_uncompressed_size(256000); if (!readStream->Write(req)) { ythrow yexception() << "write fail"; @@ -984,16 +980,16 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { for (ui32 i = 0; i < 2;) { StreamingReadServerMessage resp; UNIT_ASSERT(readStream->Read(&resp)); - if (resp.server_message_case() == StreamingReadServerMessage::kCreatePartitionStreamRequest) { - auto assignId = resp.create_partition_stream_request().partition_stream().partition_stream_id(); + if (resp.server_message_case() == StreamingReadServerMessage::kStartPartitionSessionRequest) { + auto assignId = resp.start_partition_session_request().partition_session().partition_session_id(); StreamingReadClientMessage req; - req.mutable_create_partition_stream_response()->set_partition_stream_id(assignId); + req.mutable_start_partition_session_response()->set_partition_session_id(assignId); UNIT_ASSERT(readStream->Write(req)); continue; } - UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kDataBatch, resp); - i += resp.data_batch().partition_data_size(); + UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kReadResponse, resp); + i += resp.read_response().partition_data_size(); } } diff --git a/ydb/services/persqueue_v1/ut/pq_data_writer.h b/ydb/services/persqueue_v1/ut/pq_data_writer.h index caef8940601..34e7f3bf94d 100644 --- a/ydb/services/persqueue_v1/ut/pq_data_writer.h +++ b/ydb/services/persqueue_v1/ut/pq_data_writer.h @@ -244,7 +244,7 @@ private: req.mutable_init_request()->set_topic(topic); req.mutable_init_request()->set_message_group_id(SourceId_); - req.mutable_init_request()->set_max_supported_block_format_version(0); + req.mutable_init_request()->set_max_supported_format_version(0); (*req.mutable_init_request()->mutable_session_meta())["key"] = "value"; |
