summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <[email protected]>2022-05-24 11:57:31 +0300
committerildar-khisam <[email protected]>2022-05-24 11:57:31 +0300
commitcdb547ea095e178dfd356b78583c919b4243f7db (patch)
treea6ad5c0b304c2a7849bb2d4c3f9197745e257765
parent0ece656f216a197fb7535ac68d399dee6b8ddd84 (diff)
rename pq grpc proto messages
rename StreamingRead proto messages ref:c4be9b0b221e1569d042c92d7697d3f62f22039b
-rw-r--r--ydb/public/api/protos/ydb_persqueue_v1.proto707
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp2
-rw-r--r--ydb/services/persqueue_v1/actors/helpers.cpp6
-rw-r--r--ydb/services/persqueue_v1/actors/helpers.h2
-rw-r--r--ydb/services/persqueue_v1/actors/partition_actor.cpp93
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp93
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp114
-rw-r--r--ydb/services/persqueue_v1/ut/pq_data_writer.h2
8 files changed, 336 insertions, 683 deletions
diff --git a/ydb/public/api/protos/ydb_persqueue_v1.proto b/ydb/public/api/protos/ydb_persqueue_v1.proto
index f3bd203fd14..af0f7c1ed0d 100644
--- a/ydb/public/api/protos/ydb_persqueue_v1.proto
+++ b/ydb/public/api/protos/ydb_persqueue_v1.proto
@@ -24,7 +24,7 @@ enum Codec {
CODEC_ZSTD = 4;
}
-message SessionMetaValue {
+message MetaValue {
map<string, string> value = 1;
}
@@ -70,7 +70,7 @@ message StreamingWriteClientMessage {
// Zero means any group.
int64 partition_group_id = 4;
- int64 max_supported_block_format_version = 5;
+ int64 max_supported_format_version = 5;
string session_id = 100;
// 0 for first init message and incremental value for connect retries. Used for server logging.
@@ -128,18 +128,18 @@ message StreamingWriteClientMessage {
* UpdateTokenResponse - acknowledgment of reauthentication and reauthorization.
*/
message StreamingWriteServerMessage {
- oneof server_message {
- InitResponse init_response = 3;
- BatchWriteResponse batch_write_response = 4;
- UpdateTokenResponse update_token_response = 5;
- }
-
// Server status of response.
Ydb.StatusIds.StatusCode status = 1;
// Issues if any.
repeated Ydb.Issue.IssueMessage issues = 2;
+ oneof server_message {
+ InitResponse init_response = 3;
+ BatchWriteResponse batch_write_response = 4;
+ UpdateTokenResponse update_token_response = 5;
+ }
+
// Response for handshake.
message InitResponse {
// Last persisted message's sequence number for this message group.
@@ -235,34 +235,55 @@ message CommitOffsetRange {
uint64 end_offset = 3;
}
-// TODO: replace with it actual protocol client message
+/**
+ * Message that represents concrete partition session.
+ */
+ message PartitionSession {
+ // Topic path of partition.
+ string topic = 1;
+ // Cluster of topic instance.
+ string cluster = 2;
+ // Partition identifier. Explicit only for debug purposes.
+ int64 partition_id = 3;
+ // Partition group identifier. Explicit only for debug purposes.
+ int64 partition_group_id = 4;
+
+ // Identitifier of partition stream. Unique inside one RPC call.
+ int64 partition_session_id = 6;
+
+ // Opaque blob. Provide it with partition stream in state for session reconnects.
+ bytes connection_meta = 7;
+}
+
/**
* Request for read session. Contains one of:
* InitRequest - handshake request.
* ReadRequest - request for data.
* CommitRequest - request for commit of some read data.
- * CreatePartitionStreamResponse - signal for server that client is ready to get data from partition.
- * DestroyPartitionStreamResponse - signal for server that client finished working with partition. Must be sent only after corresponding Release request from server.
- * StopReadRequest - signal for server that client is not ready to get more data from this partition.
+ * StartPartitionSessionResponse - signal for server that client is ready to get data from partition.
+ * StopPartitionSessionResponse - signal for server that client finished working with partition. Must be sent only after corresponding request from server.
+ * PauseReadRequest - signal for server that client is not ready to get more data from this partition.
* ResumeReadRequest - signal for server that client is ready to get more data from this partition.
+ * PartitionSessionStatusRequest - request for session status
+ * AddTopicRequest - request for add topic
+ * RemoveTopicRequest - request for topic removal
+ * UpdateTokenRequest - request to update auth token
*/
-message StreamingReadClientMessageNew {
+message StreamingReadClientMessage {
oneof client_message {
InitRequest init_request = 1;
ReadRequest read_request = 2;
- CreatePartitionStreamResponse create_partition_stream_response = 3;
+ StartPartitionSessionResponse start_partition_session_response = 3;
CommitRequest commit_request = 4;
- DestroyPartitionStreamResponse destroy_partition_stream_response = 5;
- StopReadRequest stop_read_request = 6;
+ StopPartitionSessionResponse stop_partition_session_response = 5;
+ PauseReadRequest pause_read_request = 6;
ResumeReadRequest resume_read_request = 7;
- PartitionStreamStatusRequest partition_stream_status_request = 8;
+ PartitionSessionStatusRequest partition_session_status_request = 8;
AddTopicRequest add_topic_request = 9;
RemoveTopicRequest remove_topic_request = 10;
+ UpdateTokenRequest update_token_request = 11;
}
- // User credentials if update is needed or empty string.
- string token = 20;
-
// Handshake request.
message InitRequest {
// Message that describes topic to read.
@@ -281,38 +302,38 @@ message StreamingReadClientMessageNew {
// Maximum block format version supported by the client. Server will asses this parameter and return actual data blocks version in
// StreamingReadServerMessage.InitResponse.block_format_version_by_topic (and StreamingReadServerMessage.AddTopicResponse.block_format_version)
// or error if client will not be able to read data.
- int64 max_supported_block_format_version = 6;
+ int64 max_supported_format_version = 6;
// Maximal size of client cache for message_group_id, ip and meta, per partition.
- // There is separate caches for each partition partition streams.
- // There is separate caches for message group identifiers, ip and meta inside one partition partition stream.
+ // There are separate caches for each partition partition sessions.
+ // There are separate caches for message group identifiers, ip and meta inside one partition session.
int64 max_meta_cache_size = 10;
// State of client read session. Could be provided to server for retries.
message State {
- message PartitionStreamState {
+ message PartitionSessionState {
enum Status {
// Not used state.
STATUS_UNSPECIFIED = 0;
- // Client seen Create message but not yet responded to server with Created message.
- CREATING = 1;
- // Client seen Destroy message but not yet responded with Released message.
- DESTROYING = 2;
- // Client sent Created or ResumeReadRequest message to server for this partition stream.
+ // Client saw StartPartitionSessionRequest message but not yet responded.
+ STARTING = 1;
+ // Client saw StopPartitionSessionRequest message but not yet responded.
+ STOPPING = 2;
+ // Client sent StartPartitionSessionResponse or ResumeReadRequest message to server for this partition session.
READING = 3;
- // Client sent StopReadRequest for this partition stream.
- STOPPED = 4;
+ // Client sent PauseReadRequest for this partition session.
+ PAUSED = 4;
}
- // Partition partition stream.
- PartitionStream partition_stream = 1;
+ // Partition partition session.
+ PartitionSession partition_session = 1;
// Current read offset if has one. Actual for states DESTROYING, READING and STOPPED.
int64 read_offset = 2;
// Ranges of committed by client offsets.
- repeated OffsetsRange offset_ranges = 3;
- // Status of partition stream.
+ repeated OffsetsRange offsets_ranges = 3;
+ // Status of partition session.
Status status = 4;
}
- repeated PartitionStreamState partition_streams_states = 1;
+ repeated PartitionSessionState partition_sessions_states = 1;
}
// Session identifier for retries. Must be the same as session_id from Inited server response. If this is first connect, not retry - do not use this field.
@@ -335,9 +356,9 @@ message StreamingReadClientMessageNew {
}
// Signal for server that cient is ready to recive data for partition.
- message CreatePartitionStreamResponse {
- // Partition stream identifier of partition to start read.
- int64 partition_stream_id = 1;
+ message StartPartitionSessionResponse {
+ // Partition session identifier of partition to start read.
+ int64 partition_session_id = 1;
// Start reading from partition offset that is not less than read_offset.
// Init.max_time_lag_ms and Init.read_timestamp_ms could lead to skip of more messages.
@@ -356,24 +377,24 @@ message StreamingReadClientMessageNew {
// Signal for server that client finished working with this partition. Must be sent only after corresponding Release request from server.
// Server will give this partition to other read session only after Released signal.
- message DestroyPartitionStreamResponse {
- // Partition stream identifier of partition partition stream that is released by client.
- int64 partition_stream_id = 1;
+ message StopPartitionSessionResponse {
+ // Partition session identifier of partition session that is released by client.
+ int64 partition_session_id = 1;
}
// Signal for server that client is not ready to recieve more data from this partition.
- message StopReadRequest {
- repeated int64 partition_stream_ids = 1;
+ message PauseReadRequest {
+ repeated int64 partition_session_ids = 1;
}
// Signal for server that client is ready to receive more data from this partition.
message ResumeReadRequest {
- repeated int64 partition_stream_ids = 1;
+ repeated int64 partition_session_ids = 1;
// Offset to start reading - may be smaller than known one in case of dropping of read-ahead in client lib.
repeated int64 read_offsets = 2;
- // Cookie for matching data from PartitionStream after resuming. Must be greater than zero.
+ // Cookie for matching data from PartitionSession after resuming. Must be greater than zero.
repeated int64 resume_cookies = 3;
}
@@ -383,16 +404,16 @@ message StreamingReadClientMessageNew {
repeated PartitionCommit commits = 1;
}
- message PartitionStreamStatusRequest {
- int64 partition_stream_id = 1;
+ message PartitionSessionStatusRequest {
+ int64 partition_session_id = 1;
}
- // Add topic to current read session
+ // Add topic to current read session.
message AddTopicRequest {
TopicReadSettings topic_read_settings = 1;
}
- // Remove topic from current read session
+ // Remove topic from current read session.
message RemoveTopicRequest {
string topic = 1;
}
@@ -411,40 +432,48 @@ message StreamingReadClientMessageNew {
* Message that is used for describing commit.
*/
message PartitionCommit {
- // Identifier of partition stream with data to commit.
- int64 partition_stream_id = 1;
+ // Identifier of partition session with data to commit.
+ int64 partition_session_id = 1;
// Processed ranges.
repeated OffsetsRange offsets = 2;
}
}
-// TODO: replace with it actual protocol server message
/**
* Response for read session. Contains one of :
* InitResponse - handshake response from server.
- * BatchReadResponse - portion of data.
+ * ReadResponse - portion of data.
+ * StartPartitionSessionRequest - command from server to create a partition partition session.
+ * StopPartitionSessionRequest - command from server to destroy a partition partition session.
* CommitResponse - acknowledgment for commit.
- * CreatePartitionStreamRequest - command from server to create a partition partition stream.
- * DestroyPartitionStreamRequest - command from server to destroy a partition partition stream.
+ * PartitionSessionStatusResponse - server response with partition session status.
+ * PauseReadResponse - acknowledgment for pausing read from this partition.
+ * ResumeReadResponse - acknowledgment for resuming read from this partition.
+ * AddTopicResponse - acknowledgment of topic adding.
+ * RemoveTopicResponse - acknowledgment of topic removal.
+ * UpdateTokenResponse - acknowledgment of token update.
*/
-message StreamingReadServerMessageNew {
- oneof server_message {
- InitResponse init_response = 3;
- BatchReadResponse batch_read_response = 4;
- CreatePartitionStreamRequest create_partition_stream_request = 5;
- DestroyPartitionStreamRequest destroy_partition_stream_request = 6;
- CommitResponse commit_response = 7;
- PartitionStreamStatusResponse partition_stream_status_response = 8;
- StopReadResponse stop_read_response = 9;
- ResumeReadResponse resume_read_response = 10;
- AddTopicResponse add_topic_response = 11;
- RemoveTopicResponse remove_topic_response = 12;
- }
-
+message StreamingReadServerMessage {
+ // Server status of response.
Ydb.StatusIds.StatusCode status = 1;
+ // Issues if any.
repeated Ydb.Issue.IssueMessage issues = 2;
+ oneof server_message {
+ InitResponse init_response = 3;
+ ReadResponse read_response = 4;
+ StartPartitionSessionRequest start_partition_session_request = 6;
+ StopPartitionSessionRequest stop_partition_session_request = 7;
+ CommitResponse commit_response = 8;
+ PartitionSessionStatusResponse partition_session_status_response = 9;
+ PauseReadResponse pause_read_response = 10;
+ ResumeReadResponse resume_read_response = 11;
+ AddTopicResponse add_topic_response = 12;
+ RemoveTopicResponse remove_topic_response = 13;
+ UpdateTokenResponse update_token_response = 14;
+ }
+
// Handshake response.
message InitResponse {
// Read session identifier for debug purposes.
@@ -452,16 +481,16 @@ message StreamingReadServerMessageNew {
// Block format version of data client will receive from topics.
map<string, int64> block_format_version_by_topic = 2;
- // Choosed maximan cache size by server.
+ // Chosen maximal cache size by server.
// Client must use cache of this size. Could change on retries - reduce size of cache in this case.
int64 max_meta_cache_size = 10;
}
- // Command to create a partition partition stream.
+ // Command to create and start a partition session.
// Client must react on this signal by sending StartRead when ready recieve data from this partition.
- message CreatePartitionStreamRequest {
+ message StartPartitionSessionRequest {
// Partition partition stream description.
- PartitionStream partition_stream = 1;
+ PartitionSession partition_session = 1;
// Actual committed offset.
int64 committed_offset = 2;
@@ -470,17 +499,17 @@ message StreamingReadServerMessageNew {
}
- // Command to destroy concrete partition stream.
- message DestroyPartitionStreamRequest {
- // Identifier of partition partition stream that is ready to be closed by server.
- int64 partition_stream_id = 1;
+ // Command to stop and destroy concrete partition session.
+ message StopPartitionSessionRequest {
+ // Identifier of partition partition session that is ready to be closed by server.
+ int64 partition_session_id = 1;
- // Flag of gracefull or not destroy.
- // If True then server is waiting for Destroyed signal from client before giving of this partition for other read session.
+ // Flag of graceful stop.
+ // If True then server is waiting for response from client before giving of this partition for other read session.
// Server will not send more data from this partition.
- // Client can process all received data and wait for commit and only after send Destroyed signal.
+ // Client can process all received data and wait for commit and only after send response.
// If False then server gives partition for other session right now.
- // All futher commits for this PartitionStream has no effect. Server is not waiting for Destroyed signal.
+ // All further commits for this PartitionSession has no effect. Server is not waiting for response.
bool graceful = 2;
// Last known committed offset.
@@ -491,8 +520,8 @@ message StreamingReadServerMessageNew {
message CommitResponse {
// Per-partition commit representation.
message PartitionCommittedOffset {
- // Partition partition stream identifier.
- int64 partition_stream_id = 1;
+ // Partition partition session identifier.
+ int64 partition_session_id = 1;
// Last committed offset.
int64 committed_offset = 2;
}
@@ -500,119 +529,58 @@ message StreamingReadServerMessageNew {
repeated PartitionCommittedOffset partitions_committed_offsets = 1;
}
- // Readed data.
- message BatchReadResponse {
+ // Data read.
+ message ReadResponse {
// One client message representation.
- // Client lib must send commit right now for all skipped data (from it's read offset till first offset in range).
- message PartitionData {
- // Data inside this message is from partition stream with this identifier.
- int64 partition_stream_id = 1;
-
- // Offsets in partition that assigned for messages.
- // Unique value for clientside deduplication - (topic, cluster, partition_id, offset).
- repeated int64 offsets = 2;
- // Sequence numbers that provided with messages on write from client.
- // Same size as offsets.
- // Unique value for clientside deduplication - (topic, cluster, message_group_id, sequence_number).
- repeated int64 sequence_numbers = 3;
- // Timestamps of creation of messages provided on write from client.
- // Same size as offsets.
- repeated int64 created_at_ms = 4;
- // Timestamps of writing in partition for client messages.
- // Same size as offsets.
- repeated int64 written_at_ms = 5;
-
- // New messageGroupIds for updating cache.
- // Size of vector is the same as number of negative values in message_group_id_indexes.
- repeated string message_group_ids = 6;
- // Indexes of messageGroupIds.
- // same size as offsets.
- // Negative values (-X) means - put first not used messageGroupId from message_group_ids on index X in cache and use it for this client message.
- // Positive values (X) means -use element by index X from cache for this client message. Do not change state of cache.
- // Assumptions:
- // - Server will use positive values only for proposed before indexes.
- // - Each value is from 1 to max_meta_cache_size by abs.
- // - Do not make assumptions about choosing algorihm.
- // - There is separate caches of size max_meta_cache_size for different partition and different metadata fileds - message_group_id, ip and session_meta_data.
- // - Number of negative values in message_group_id_indexes vector is the same as length of message_group_ids vector.
- // Example:
- // max_meta_cache_size : 2
- // Cache indexes : 1 2
- // Cache state before processing : s0,? // ? means not set yet.
- //
- // message_group_ids : s1 s2 s3 s1
- // message_group_id_indexes : -1 -2 1 2 1 1 -1 2 -2
- // cache state : s1,? s1,s2 s1,s2 s1,s2 s1,s2 s1,s2 s3,s2 s3,s2 s3,s1
- // real message group ids : s1 s2 s1 s2 s1 s1 s3 s2 s1
- // Cache indexes : 1 2
- // Cache state after processing : s3,s1
- repeated sint64 message_group_id_indexes = 7;
-
- // New ips for updating ip cache.
- repeated string ips = 8;
- // Same as message_group_id_indexes but for ips.
- repeated sint64 ip_indexes = 9;
-
- // New session meta datas for updating cache.
- repeated SessionMetaValue message_session_meta = 10;
- // Same as message_group_id_indexes but for session meta data.
- repeated sint64 message_session_meta_indexes = 11;
-
- // Client messages sizes.
- // Same size as offsets.
- repeated int64 message_sizes = 12;
-
- // Block must contain whole client message when it's size is not bigger that max_block_size.
- // If message is bigger than max_block_size - it will be transferred as SIZE/max_block_size blocks. All of this blocks will be with block_count = 0 but not the last one - last one's block_count will be 0;
- // Blocks can be reordered upto provided by client uncompressed free buffer size.
- // blocks: A A A B B B CDE
- // offset: 1 1 1 4 4 4 6
- // part_number: 0 1 2 0 1 2 0
- // count: 0 0 1 0 0 1 3
- // Offset will be the same as in Offsets.
- repeated int64 blocks_offsets = 13;
- repeated int64 blocks_part_numbers = 14;
- // How many complete messages and imcomplete messages end parts (one at most) this block contains
- repeated int64 blocks_message_counts = 15;
- repeated int64 blocks_uncompressed_sizes = 16;
- // In block format version 0 each byte contains only block codec identifier
- repeated bytes blocks_headers = 17;
- repeated bytes blocks_data = 18;
-
- // Zero if this is not first portion of data after resume or provided by client cookie otherwise.
- int64 resume_cookie = 50;
-
- message ReadStatistics {
- int64 blobs_from_cache = 1;
- int64 blobs_from_disk = 2;
- int64 bytes_from_head = 3;
- int64 bytes_from_cache = 4;
- int64 bytes_from_disk = 5;
- int64 repack_duration_ms = 6;
- }
- ReadStatistics read_statistics = 100;
+ message MessageData {
+ // Partition offset in partition that assigned for message.
+ int64 offset = 1; //unique value for clientside deduplication - Topic:Cluster:Partition:Offset
+ // Sequence number that provided with message on write from client.
+ int64 seq_no = 2;
+ // Timestamp of creation of message provided on write from client.
+ int64 create_timestamp_ms = 3;
+ // Codec that is used for data compressing.
+ Codec codec = 4;
+ // Compressed client message body.
+ bytes data = 5;
+ // Uncompressed size of client message body.
+ int64 uncompressed_size = 6;
+ // kinesis data
+ string partition_key = 7;
+ bytes explicit_hash = 8;
}
- message SkipRange {
- // Partition Stream identifier.
- int64 partition_stream_id = 1;
+ // Representation of sequence of client messages from one write session.
+ message Batch {
+ // Source identifier provided by client for this batch of client messages.
+ bytes message_group_id = 2;
+ // Client metadata attached to write session, the same for all messages in batch.
+ MetaValue session_meta = 3;
+ // Persist timestamp on server for batch.
+ int64 write_timestamp_ms = 4;
+ // Peer address of node that created write session.
+ string ip = 5;
- // When some data is skipped by client parameters (read_timestamp_ms for example) then range of skipped offsets is sended to client.
- // Client lib must commit this range and change read_offset to end of this range.
- OffsetsRange skip_range = 2;
+ // List of client messages.
+ repeated MessageData message_data = 1;
}
- repeated SkipRange skip_range = 1;
+ // Representation of sequence of messages from one partition.
+ message PartitionData {
+ int64 partition_session_id = 1;
- // Per-partition data.
- repeated PartitionData partitions = 2;
+ // Client messages, divided by write sessions.
+ repeated Batch batches = 2;
+ }
+ // Client messages, divided by partitions.
+ repeated PartitionData partition_data = 1;
}
- // Response for status requst.
- message PartitionStreamStatusResponse {
- // Identifier of partition partition stream that is ready to be closed by server.
- int64 partition_stream_id = 1;
+ // Response for status request.
+ message PartitionSessionStatusResponse {
+ // Identifier of partition partition session that is ready to be closed by server.
+ int64 partition_session_id = 1;
int64 committed_offset = 2;
int64 end_offset = 3;
@@ -621,7 +589,7 @@ message StreamingReadServerMessageNew {
int64 written_at_watermark_ms = 4;
}
- message StopReadResponse {
+ message PauseReadResponse {
}
message ResumeReadResponse {
@@ -664,7 +632,6 @@ message PartitionStream {
* Start_read - signal for server that client is ready to get data from partition.
* Released - signal for server that client finished working with partition. Must be sent only after corresponding Release request from server.
*/
-
message MigrationStreamingReadClientMessage {
message TopicReadSettings {
// Topic path.
@@ -824,8 +791,6 @@ message MigrationStreamingReadClientMessage {
bytes token = 20;
}
-
-
/**
* Response for read session. Contains one of :
* Inited - handshake response from server.
@@ -834,7 +799,6 @@ message MigrationStreamingReadClientMessage {
* Assigned - signal from server for assigning of partition.
* Release - signal from server for releasing of partition.
*/
-
message MigrationStreamingReadServerMessage {
// Handshake response.
message InitResponse {
@@ -986,7 +950,6 @@ message MigrationStreamingReadServerMessage {
/**
* Reading information request sent from client to server.
*/
-
message ReadInfoRequest {
Ydb.Operations.OperationParams operation_params = 1;
// List of topics that are beeing read.
@@ -1001,7 +964,6 @@ message ReadInfoRequest {
/**
* Reading information response sent from server to client.
*/
-
message ReadInfoResponse {
// Result of request will be inside operation.
Ydb.Operations.Operation operation = 1;
@@ -1010,7 +972,6 @@ message ReadInfoResponse {
/**
* Reading information message that will be inside ReadInfoResponse.operation.
*/
-
message ReadInfoResult {
// Message containing information about concrete topic reading.
message TopicInfo {
@@ -1079,11 +1040,9 @@ message ReadInfoResult {
repeated TopicInfo topics = 1;
}
-
/**
* Drop topic request sent from client to server.
*/
-
message DropTopicRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Topic path.
@@ -1094,7 +1053,6 @@ message DropTopicRequest {
/**
* Drop topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR".
*/
-
message DropTopicResponse {
// Result of request will be inside operation.
Ydb.Operations.Operation operation = 1;
@@ -1103,14 +1061,12 @@ message DropTopicResponse {
/**
* Drop topic result message that will be inside DropTopicResponse.operation.
*/
-
message DropTopicResult {
}
/**
* Credentials settings
*/
-
message Credentials {
message Iam {
string endpoint = 1;
@@ -1126,7 +1082,6 @@ message Credentials {
/**
* Message for describing topic internals.
*/
-
message TopicSettings {
enum Format {
FORMAT_UNSPECIFIED = 0;
@@ -1212,7 +1167,6 @@ message TopicSettings {
/**
* Create topic request sent from client to server.
*/
-
message CreateTopicRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Topic path.
@@ -1225,7 +1179,6 @@ message CreateTopicRequest {
/**
* Create topic response sent from server to client. If topic is already exists then response status will be "ALREADY_EXISTS".
*/
-
message CreateTopicResponse {
// Result of request will be inside operation.
Ydb.Operations.Operation operation = 1;
@@ -1234,14 +1187,12 @@ message CreateTopicResponse {
/**
* Create topic result message that will be inside CreateTopicResponse.operation.
*/
-
message CreateTopicResult {
}
/**
* Update existing topic request sent from client to server.
*/
-
message AlterTopicRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Topic path.
@@ -1250,11 +1201,9 @@ message AlterTopicRequest {
TopicSettings settings = 4;
}
-
/**
* Update topic response sent from server to client.
*/
-
message AlterTopicResponse {
// Result of request will be inside operation.
Ydb.Operations.Operation operation = 1;
@@ -1291,7 +1240,6 @@ message AddReadRuleResponse {
message AddReadRuleResult {
}
-
/**
* Remove read rules request for existing topic.
*/
@@ -1317,22 +1265,18 @@ message RemoveReadRuleResponse {
message RemoveReadRuleResult {
}
-
/**
* Describe topic request sent from client to server.
*/
-
message DescribeTopicRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Topic path.
string path = 2 [(required) = true];
}
-
/**
* Describe topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR".
*/
-
message DescribeTopicResponse {
// Result of request will be inside operation.
Ydb.Operations.Operation operation = 1;
@@ -1341,7 +1285,6 @@ message DescribeTopicResponse {
/**
* Describe topic result message that will be inside DescribeTopicResponse.operation.
*/
-
message DescribeTopicResult {
// Topic path.
Ydb.Scheme.Entry self = 1;
@@ -1349,341 +1292,3 @@ message DescribeTopicResult {
TopicSettings settings = 2;
}
-
-///////////////////////////////////////////////////////////
-
-message StreamingReadClientMessage {
- /**
- * Message that is used for describing commit.
- */
- message PartitionCommit {
- // Identifier of partition stream with data to commit.
- int64 partition_stream_id = 1;
- // Processed ranges.
- repeated OffsetsRange offsets = 2;
- }
-
- message TopicReadSettings {
- // Topic path.
- string topic = 1;
- // Partition groups that will be read by this session.
- // If list is empty - then session will read all partition groups.
- repeated int64 partition_group_ids = 2;
- // Read data only after this timestamp from this topic.
- int64 start_from_written_at_ms = 3;
- }
-
- // Handshake request.
- message InitRequest {
- // Message that describes topic to read.
- // Topics that will be read by this session.
- repeated TopicReadSettings topics_read_settings = 1;
- // Flag that indicates reading only of original topics in cluster or all including mirrored.
- bool read_only_original = 2;
- // Path of consumer that is used for reading by this session.
- string consumer = 3;
-
- // Skip all messages that has write timestamp smaller than now - max_time_lag_ms.
- int64 max_lag_duration_ms = 4;
- // Read data only after this timestamp from all topics.
- int64 start_from_written_at_ms = 5;
-
- // Maximum block format version supported by the client. Server will asses this parameter and return actual data blocks version in
- // StreamingReadServerMessage.InitResponse.block_format_version_by_topic (and StreamingReadServerMessage.AddTopicResponse.block_format_version)
- // or error if client will not be able to read data.
- int64 max_supported_block_format_version = 6;
-
- // Maximal size of client cache for message_group_id, ip and meta, per partition.
- // There is separate caches for each partition partition streams.
- // There is separate caches for message group identifiers, ip and meta inside one partition partition stream.
- int64 max_meta_cache_size = 10;
-
- // State of client read session. Could be provided to server for retries.
- message State {
- message PartitionStreamState {
- enum Status {
- // Not used state.
- STATUS_UNSPECIFIED = 0;
- // Client seen Create message but not yet responded to server with Created message.
- CREATING = 1;
- // Client seen Destroy message but not yet responded with Released message.
- DESTROYING = 2;
- // Client sent Created or ResumeReadRequest message to server for this partition stream.
- READING = 3;
- // Client sent StopReadRequest for this partition stream.
- STOPPED = 4;
- }
- // Partition partition stream.
- PartitionStream partition_stream = 1;
- // Current read offset if has one. Actual for states DESTROYING, READING and STOPPED.
- int64 read_offset = 2;
- // Ranges of committed by client offsets.
- repeated OffsetsRange offset_ranges = 3;
- // Status of partition stream.
- Status status = 4;
- }
- repeated PartitionStreamState partition_streams_states = 1;
- }
-
- // Session identifier for retries. Must be the same as session_id from Inited server response. If this is first connect, not retry - do not use this field.
- string session_id = 100;
- // 0 for first init message and incremental value for connect retries.
- int64 connection_attempt = 101;
- // Formed state for retries. If not retry - do not use this field.
- State state = 102;
-
- int64 idle_timeout_ms = 200;
-
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // TODO: remove after adding BatchReadResponse
- // Single read request params.
- ReadParams read_params = 42;
- }
-
- // TODO: add topics/groups and remove them from reading
-
- // Message that represents client readiness for receiving more data.
- message ReadRequest {
- // Client acquired this amount of free bytes more for buffer. Server can send more data at most of this uncompressed size.
- // Subsequent messages with 5 and 10 request_uncompressed_size are treated by server that it can send messages for at most 15 bytes.
- int64 request_uncompressed_size = 1;
- }
-
- // Signal for server that cient is ready to recive data for partition.
- message CreatePartitionStreamResponse {
- // Partition stream identifier of partition to start read.
- int64 partition_stream_id = 1;
-
- // Start reading from partition offset that is not less than read_offset.
- // Init.max_time_lag_ms and Init.read_timestamp_ms could lead to skip of more messages.
- // The same with actual committed offset. Regardless of set read_offset server will return data from maximal offset from read_offset, actual committed offset
- // and offsets calculated from Init.max_time_lag_ms and Init.read_timestamp_ms.
- int64 read_offset = 2;
- // All messages with offset less than commit_offset are processed by client. Server will commit this position if this is not done yet.
- int64 commit_offset = 3;
-
- // This option will enable sanity check on server for read_offset. Server will verify that read_offset is no less that actual committed offset.
- // If verification will fail then server will kill this read session and client will find out error in reading logic.
- // If client is not setting read_offset, sanity check will fail so do not set verify_read_offset if you not setting correct read_offset.
- bool verify_read_offset = 4;
-
- }
-
- // Signal for server that client processed some read data.
- message CommitRequest {
- // Partition offsets that indicates processed data.
- repeated PartitionCommit commits = 1;
- }
-
- // Signal for server that client finished working with this partition. Must be sent only after corresponding Release request from server.
- // Server will give this partition to other read session only after Released signal.
- message DestroyPartitionStreamResponse {
- // Partition stream identifier of partition partition stream that is released by client.
- int64 partition_stream_id = 1;
- }
-
- // Signal for server that client is not ready to recieve more data from this partition.
- message StopReadRequest {
- repeated int64 partition_stream_ids = 1;
- }
-
- // Signal for server that client is ready to receive more data from this partition.
- message ResumeReadRequest {
- repeated int64 partition_stream_ids = 1;
-
- // Offset to start reading - may be smaller than known one in case of dropping of read-ahead in client lib.
- repeated int64 read_offsets = 2;
-
- // Cookie for matching data from PartitionStream after resuming. Must be greater than zero.
- repeated int64 resume_cookies = 3;
- }
-
- message PartitionStreamStatusRequest {
- int64 partition_stream_id = 1;
- }
-
- // Add topic to current read session
- message AddTopicRequest {
- TopicReadSettings topic_read_settings = 1;
- }
-
- // Remove topic from current read session
- message RemoveTopicRequest {
- string topic = 1;
- }
-
- oneof client_message {
- InitRequest init_request = 1;
- ReadRequest read_request = 2;
- CreatePartitionStreamResponse create_partition_stream_response = 3;
- CommitRequest commit_request = 4;
- DestroyPartitionStreamResponse destroy_partition_stream_response = 5;
- StopReadRequest stop_read_request = 6;
- ResumeReadRequest resume_read_request = 7;
- PartitionStreamStatusRequest partition_stream_status_request = 8;
- AddTopicRequest add_topic_request = 9;
- RemoveTopicRequest remove_topic_request = 10;
- UpdateTokenRequest update_token_request = 11;
- }
-}
-
-message StreamingReadServerMessage {
- // Handshake response.
- message InitResponse {
- // Read session identifier for debug purposes.
- string session_id = 1;
- // Block format version of data client will receive from topics.
- map<string, int64> block_format_version_by_topic = 2;
-
- // Choosed maximal cache size by server.
- // Client must use cache of this size. Could change on retries - reduce size of cache in this case.
- int64 max_meta_cache_size = 10;
- }
-
- // Readed data.
- message DataBatch {
- // One client message representation.
- message MessageData {
- // Partition offset in partition that assigned for message.
- uint64 offset = 1; //unique value for clientside deduplication - Topic:Cluster:Partition:Offset
- // Sequence number that provided with message on write from client.
- uint64 seq_no = 2;
- // Timestamp of creation of message provided on write from client.
- uint64 create_timestamp_ms = 3;
- // Codec that is used for data compressing.
- Codec codec = 4;
- // Compressed client message body.
- bytes data = 5;
- // Uncompressed size of client message body.
- uint64 uncompressed_size = 6;
- // kinesis data
- string partition_key = 7;
- bytes explicit_hash = 8;
- }
-
- // Representation of sequence of client messages from one write session.
- message Batch {
- // Source identifier provided by client for this batch of client messages.
- bytes source_id = 2;
- // Client metadata attached to write session, the same for all messages in batch.
- repeated KeyValue extra_fields = 3;
- // Persist timestamp on server for batch.
- uint64 write_timestamp_ms = 4;
- // Peer address of node that created write session.
- string ip = 5;
-
- // List of client messages.
- repeated MessageData message_data = 1;
- }
-
- // Representation of sequence of messages from one partition.
- message PartitionData {
- // Partition's topic path.
- Path topic = 1;
- // Topic's instance cluster name.
- string cluster = 2;
- // Partition identifier. topic:cluster:partition is unique addressing for partition.
- uint64 partition = 3;
-
- // Client messages, divided by write sessions.
- repeated Batch batches = 4;
-
- // Cookie for addressing this partition messages batch for committing.
- CommitCookie cookie = 5;
-
- // Old formatted topic name with cluster inside.
- string deprecated_topic = 10;
- }
-
- // Client messages, divided by partitions.
- repeated PartitionData partition_data = 1;
- }
-
- // Command to create a partition partition stream.
- // Client must react on this signal by sending StartRead when ready recieve data from this partition.
- message CreatePartitionStreamRequest {
- // Partition partition stream description.
- PartitionStream partition_stream = 1;
-
- // Actual committed offset.
- int64 committed_offset = 2;
- // Offset of first not existing message in partition till now.
- int64 end_offset = 3;
-
- }
-
- // Command to destroy concrete partition stream.
- message DestroyPartitionStreamRequest {
- // Identifier of partition partition stream that is ready to be closed by server.
- int64 partition_stream_id = 1;
-
- // Flag of gracefull or not destroy.
- // If True then server is waiting for Destroyed signal from client before giving of this partition for other read session.
- // Server will not send more data from this partition.
- // Client can process all received data and wait for commit and only after send Destroyed signal.
- // If False then server gives partition for other session right now.
- // All futher commits for this PartitionStream has no effect. Server is not waiting for Destroyed signal.
- bool graceful = 2;
-
- // Last known committed offset.
- int64 committed_offset = 3;
- }
-
- // Acknowledgement for commits.
- message CommitResponse {
- // Per-partition commit representation.
- message PartitionCommittedOffset {
- // Partition partition stream identifier.
- int64 partition_stream_id = 1;
- // Last committed offset.
- int64 committed_offset = 2;
- }
- // Partitions with progress.
- repeated PartitionCommittedOffset partitions_committed_offsets = 1;
- }
-
- // Response for status requst.
- message PartitionStreamStatusResponse {
- // Identifier of partition partition stream that is ready to be closed by server.
- int64 partition_stream_id = 1;
-
- int64 committed_offset = 2;
- int64 end_offset = 3;
-
- // WriteTimestamp of next message (and end_offset) will be not less that WriteWatermarkMs.
- int64 written_at_watermark_ms = 4;
- }
-
- message StopReadResponse {
- }
-
- message ResumeReadResponse {
- }
-
- message AddTopicResponse {
- // Block format version of data client will receive from the topic.
- int64 block_format_version = 1;
- }
-
- message RemoveTopicResponse {
- }
-
- Ydb.StatusIds.StatusCode status = 1;
-
- repeated Ydb.Issue.IssueMessage issues = 2;
-
- oneof server_message {
- InitResponse init_response = 3;
- DataBatch data_batch = 4;
- CreatePartitionStreamRequest create_partition_stream_request = 5;
- DestroyPartitionStreamRequest destroy_partition_stream_request = 6;
- CommitResponse commit_response = 7;
- PartitionStreamStatusResponse partition_stream_status_response = 8;
- StopReadResponse stop_read_response = 9;
- ResumeReadResponse resume_read_response = 10;
- AddTopicResponse add_topic_response = 11;
- RemoveTopicResponse remove_topic_response = 12;
- UpdateTokenResponse update_token_response = 13;
- }
-}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
index fab21d7617f..ea771e701c5 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
@@ -543,7 +543,7 @@ void TWriteSession::InitImpl() {
if (Settings.PartitionGroupId_) {
init->set_partition_group_id(*Settings.PartitionGroupId_);
}
- init->set_max_supported_block_format_version(0);
+ init->set_max_supported_format_version(0);
init->set_preferred_cluster(PreferredClusterByCDS);
for (const auto& attr : Settings.Meta_.Fields) {
diff --git a/ydb/services/persqueue_v1/actors/helpers.cpp b/ydb/services/persqueue_v1/actors/helpers.cpp
index 46adacd6ff1..779cf312c13 100644
--- a/ydb/services/persqueue_v1/actors/helpers.cpp
+++ b/ydb/services/persqueue_v1/actors/helpers.cpp
@@ -17,11 +17,11 @@ bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::Dat
}
// TODO: remove after refactor
-bool RemoveEmptyMessages(PersQueue::V1::StreamingReadServerMessage::DataBatch& data) {
- auto batchRemover = [&](PersQueue::V1::StreamingReadServerMessage::DataBatch::Batch& batch) -> bool {
+bool RemoveEmptyMessages(PersQueue::V1::StreamingReadServerMessage::ReadResponse& data) {
+ auto batchRemover = [&](PersQueue::V1::StreamingReadServerMessage::ReadResponse::Batch& batch) -> bool {
return batch.message_data_size() == 0;
};
- auto partitionDataRemover = [&](PersQueue::V1::StreamingReadServerMessage::DataBatch::PartitionData& partition) -> bool {
+ auto partitionDataRemover = [&](PersQueue::V1::StreamingReadServerMessage::ReadResponse::PartitionData& partition) -> bool {
NProtoBuf::RemoveRepeatedFieldItemIf(partition.mutable_batches(), batchRemover);
return partition.batches_size() == 0;
};
diff --git a/ydb/services/persqueue_v1/actors/helpers.h b/ydb/services/persqueue_v1/actors/helpers.h
index ed1b243962a..1a46d93a308 100644
--- a/ydb/services/persqueue_v1/actors/helpers.h
+++ b/ydb/services/persqueue_v1/actors/helpers.h
@@ -9,6 +9,6 @@ using namespace Ydb;
bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data);
// TODO: remove after grpc refactor
-bool RemoveEmptyMessages(PersQueue::V1::StreamingReadServerMessage::DataBatch& data);
+bool RemoveEmptyMessages(PersQueue::V1::StreamingReadServerMessage::ReadResponse& data);
}
diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp
index f37fc307963..7139e22936a 100644
--- a/ydb/services/persqueue_v1/actors/partition_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp
@@ -254,25 +254,62 @@ void TPartitionActor::Handle(const TEvPQProxy::TEvRestartPipe::TPtr&, const TAct
}
}
-// TODO: keep only Migration version
-template<typename TServerMessage>
+TString GetBatchSourceId(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch* batch) {
+ Y_VERIFY(batch);
+ return batch->source_id();
+}
+
+TString GetBatchSourceId(PersQueue::V1::StreamingReadServerMessage::ReadResponse::Batch* batch) {
+ Y_VERIFY(batch);
+ return batch->message_group_id();
+}
+
+void SetBatchSourceId(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch* batch, TString value) {
+ Y_VERIFY(batch);
+ batch->set_source_id(std::move(value));
+}
+
+void SetBatchSourceId(PersQueue::V1::StreamingReadServerMessage::ReadResponse::Batch* batch, TString value) {
+ Y_VERIFY(batch);
+ batch->set_message_group_id(std::move(value));
+}
+
+void SetBatchExtraField(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch* batch, TString key, TString value) {
+ Y_VERIFY(batch);
+ auto* item = batch->add_extra_fields();
+ item->set_key(std::move(key));
+ item->set_value(std::move(value));
+}
+
+void SetBatchExtraField(PersQueue::V1::StreamingReadServerMessage::ReadResponse::Batch* batch, TString key, TString value) {
+ Y_VERIFY(batch);
+ (*batch->mutable_session_meta()->mutable_value())[key] = std::move(value);
+}
+
+template<typename TReadResponse>
bool FillBatchedData(
- typename TServerMessage::DataBatch * data, const NKikimrClient::TCmdReadResult& res,
+ TReadResponse* data, const NKikimrClient::TCmdReadResult& res,
const TPartitionId& Partition, ui64 ReadIdToResponse, ui64& ReadOffset, ui64& WTime, ui64 EndOffset,
const NPersQueue::TTopicConverterPtr& topic, const TActorContext& ctx) {
-
+ constexpr bool UseMigrationProtocol = std::is_same_v<TReadResponse, PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch>;
auto* partitionData = data->add_partition_data();
- partitionData->mutable_topic()->set_path(topic->GetFederationPath());
- partitionData->set_cluster(topic->GetCluster());
- partitionData->set_partition(Partition.Partition);
- partitionData->set_deprecated_topic(topic->GetClientsideName());
- partitionData->mutable_cookie()->set_assign_id(Partition.AssignId);
- partitionData->mutable_cookie()->set_partition_cookie(ReadIdToResponse);
+
+ if constexpr (UseMigrationProtocol) {
+ partitionData->mutable_topic()->set_path(topic->GetFederationPath());
+ partitionData->set_cluster(topic->GetCluster());
+ partitionData->set_partition(Partition.Partition);
+ partitionData->set_deprecated_topic(topic->GetClientsideName());
+ partitionData->mutable_cookie()->set_assign_id(Partition.AssignId);
+ partitionData->mutable_cookie()->set_partition_cookie(ReadIdToResponse);
+
+ } else {
+ partitionData->set_partition_session_id(Partition.AssignId);
+ }
bool hasOffset = false;
bool hasData = false;
- typename TServerMessage::DataBatch::Batch* currentBatch = nullptr;
+ typename TReadResponse::Batch* currentBatch = nullptr;
for (ui32 i = 0; i < res.ResultSize(); ++i) {
const auto& r = res.GetResult(i);
WTime = r.GetWriteTimestampMS();
@@ -294,42 +331,34 @@ bool FillBatchedData(
sourceId = NPQ::NSourceIdEncoding::Decode(r.GetSourceId());
}
- if (!currentBatch || currentBatch->write_timestamp_ms() != r.GetWriteTimestampMS() || currentBatch->source_id() != sourceId) {
+ if (!currentBatch
+ || static_cast<i64>(currentBatch->write_timestamp_ms()) != static_cast<i64>(r.GetWriteTimestampMS())
+ || GetBatchSourceId(currentBatch) != sourceId) {
// If write time and source id are the same, the rest fields will be the same too.
currentBatch = partitionData->add_batches();
currentBatch->set_write_timestamp_ms(r.GetWriteTimestampMS());
- currentBatch->set_source_id(sourceId);
+ SetBatchSourceId(currentBatch, std::move(sourceId));
if (proto.HasMeta()) {
const auto& header = proto.GetMeta();
if (header.HasServer()) {
- auto* item = currentBatch->add_extra_fields();
- item->set_key("server");
- item->set_value(header.GetServer());
+ SetBatchExtraField(currentBatch, "server", header.GetServer());
}
if (header.HasFile()) {
- auto* item = currentBatch->add_extra_fields();
- item->set_key("file");
- item->set_value(header.GetFile());
+ SetBatchExtraField(currentBatch, "file", header.GetFile());
}
if (header.HasIdent()) {
- auto* item = currentBatch->add_extra_fields();
- item->set_key("ident");
- item->set_value(header.GetIdent());
+ SetBatchExtraField(currentBatch, "ident", header.GetIdent());
}
if (header.HasLogType()) {
- auto* item = currentBatch->add_extra_fields();
- item->set_key("logtype");
- item->set_value(header.GetLogType());
+ SetBatchExtraField(currentBatch, "logtype", header.GetLogType());
}
}
if (proto.HasExtraFields()) {
const auto& map = proto.GetExtraFields();
for (const auto& kv : map.GetItems()) {
- auto* item = currentBatch->add_extra_fields();
- item->set_key(kv.GetKey());
- item->set_value(kv.GetValue());
+ SetBatchExtraField(currentBatch, kv.GetKey(), kv.GetValue());
}
}
@@ -506,11 +535,11 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
bool hasData = false;
if (UseMigrationProtocol) {
- auto* data = migrationResponse.mutable_data_batch();
- hasData = FillBatchedData<MigrationStreamingReadServerMessage>(data, res, Partition, ReadIdToResponse, ReadOffset, WTime, EndOffset, Topic, ctx);
+ typename MigrationStreamingReadServerMessage::DataBatch* data = migrationResponse.mutable_data_batch();
+ hasData = FillBatchedData<MigrationStreamingReadServerMessage::DataBatch>(data, res, Partition, ReadIdToResponse, ReadOffset, WTime, EndOffset, Topic, ctx);
} else {
- auto* data = response.mutable_data_batch();
- hasData = FillBatchedData<StreamingReadServerMessage>(data, res, Partition, ReadIdToResponse, ReadOffset, WTime, EndOffset, Topic, ctx);
+ StreamingReadServerMessage::ReadResponse* data = response.mutable_read_response();
+ hasData = FillBatchedData<StreamingReadServerMessage::ReadResponse>(data, res, Partition, ReadIdToResponse, ReadOffset, WTime, EndOffset, Topic, ctx);
}
WriteTimestampEstimateMs = Max(WriteTimestampEstimateMs, WTime);
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 2cbc42deebb..a329268ae85 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -125,7 +125,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF
if constexpr (UseMigrationProtocol) {
return request.assign_id();
} else {
- return request.partition_stream_id();
+ return request.partition_session_id();
}
}();
Y_VERIFY(Partitions.find(id) != Partitions.end());
@@ -255,8 +255,8 @@ if (!partId.DiscoveryConverter->IsValid()) { \
ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead()); // Proto read message have no parameters
break;
}
- case TClientMessage::kPartitionStreamStatusRequest: {
- GET_PART_ID_OR_EXIT(request.partition_stream_status_request());
+ case TClientMessage::kPartitionSessionStatusRequest: {
+ GET_PART_ID_OR_EXIT(request.partition_session_status_request());
ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(partId));
if (!Request->GetStreamCtx()->Read()) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
@@ -266,8 +266,8 @@ if (!partId.DiscoveryConverter->IsValid()) { \
break;
}
- case TClientMessage::kDestroyPartitionStreamResponse: {
- GET_PART_ID_OR_EXIT(request.destroy_partition_stream_response());
+ case TClientMessage::kStopPartitionSessionResponse: {
+ GET_PART_ID_OR_EXIT(request.stop_partition_session_response());
ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(partId));
if (!Request->GetStreamCtx()->Read()) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
@@ -277,14 +277,14 @@ if (!partId.DiscoveryConverter->IsValid()) { \
break;
}
- case TClientMessage::kCreatePartitionStreamResponse: {
- const auto& req = request.create_partition_stream_response();
+ case TClientMessage::kStartPartitionSessionResponse: {
+ const auto& req = request.start_partition_session_response();
const ui64 readOffset = req.read_offset();
const ui64 commitOffset = req.commit_offset();
const bool verifyReadOffset = req.verify_read_offset();
- GET_PART_ID_OR_EXIT(request.create_partition_stream_response());
+ GET_PART_ID_OR_EXIT(request.start_partition_session_response());
ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(partId, readOffset, commitOffset, verifyReadOffset));
if (!Request->GetStreamCtx()->Read()) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
@@ -304,7 +304,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \
THashMap<ui64, TEvPQProxy::TCommitRange> commitRange;
for (auto& pc: req.commits()) {
- auto id = pc.partition_stream_id();
+ auto id = pc.partition_session_id();
for (auto& c: pc.offsets()) {
commitRange[id].Ranges.push_back(std::make_pair(c.start_offset(), c.end_offset()));
}
@@ -579,7 +579,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvCommitDone::
} else {
auto c = result.mutable_commit_response()->add_partitions_committed_offsets();
- c->set_partition_stream_id(assignId);
+ c->set_partition_session_id(assignId);
c->set_committed_offset(ev->Get()->Offset);
}
}
@@ -660,10 +660,8 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr&
MaxReadSize = NormalizeMaxReadSize(init.read_params().max_read_size());
} else {
RangesMode = true;
- // MaxReadMessagesCount = NormalizeMaxReadMessagesCount(0);
- // MaxReadSize = NormalizeMaxReadSize(0);
- MaxReadMessagesCount = NormalizeMaxReadMessagesCount(init.read_params().max_read_messages_count());
- MaxReadSize = NormalizeMaxReadSize(init.read_params().max_read_size());
+ MaxReadMessagesCount = NormalizeMaxReadMessagesCount(0);
+ MaxReadSize = NormalizeMaxReadSize(0);
}
if (init.max_lag_duration_ms() < 0) {
CloseSession("max_lag_duration_ms must be nonnegative number", PersQueue::ErrorCode::BAD_REQUEST, ctx);
@@ -1069,13 +1067,13 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta
result.mutable_assigned()->set_end_offset(ev->Get()->EndOffset);
} else {
- result.mutable_create_partition_stream_request()->mutable_partition_stream()->set_topic(it->second.Topic->GetFederationPath());
- result.mutable_create_partition_stream_request()->mutable_partition_stream()->set_cluster(it->second.Topic->GetCluster());
- result.mutable_create_partition_stream_request()->mutable_partition_stream()->set_partition_id(ev->Get()->Partition.Partition);
- result.mutable_create_partition_stream_request()->mutable_partition_stream()->set_partition_stream_id(it->first);
+ result.mutable_start_partition_session_request()->mutable_partition_session()->set_topic(it->second.Topic->GetFederationPath());
+ result.mutable_start_partition_session_request()->mutable_partition_session()->set_cluster(it->second.Topic->GetCluster());
+ result.mutable_start_partition_session_request()->mutable_partition_session()->set_partition_id(ev->Get()->Partition.Partition);
+ result.mutable_start_partition_session_request()->mutable_partition_session()->set_partition_session_id(it->first);
- result.mutable_create_partition_stream_request()->set_committed_offset(ev->Get()->Offset);
- result.mutable_create_partition_stream_request()->set_end_offset(ev->Get()->EndOffset);
+ result.mutable_start_partition_session_request()->set_committed_offset(ev->Get()->Offset);
+ result.mutable_start_partition_session_request()->set_end_offset(ev->Get()->EndOffset);
}
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " sending to client create partition stream event");
@@ -1110,11 +1108,11 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta
result.mutable_partition_status()->set_write_watermark_ms(ev->Get()->WriteTimestampEstimateMs);
} else {
- result.mutable_partition_stream_status_response()->set_partition_stream_id(it->first);
+ result.mutable_partition_session_status_response()->set_partition_session_id(it->first);
- result.mutable_partition_stream_status_response()->set_committed_offset(ev->Get()->Offset);
- result.mutable_partition_stream_status_response()->set_end_offset(ev->Get()->EndOffset);
- result.mutable_partition_stream_status_response()->set_written_at_watermark_ms(ev->Get()->WriteTimestampEstimateMs);
+ result.mutable_partition_session_status_response()->set_committed_offset(ev->Get()->Offset);
+ result.mutable_partition_session_status_response()->set_end_offset(ev->Get()->EndOffset);
+ result.mutable_partition_session_status_response()->set_written_at_watermark_ms(ev->Get()->WriteTimestampEstimateMs);
}
auto pp = it->second.Partition;
@@ -1154,9 +1152,9 @@ void TReadSessionActor<UseMigrationProtocol>::SendReleaseSignalToClient(const ty
result.mutable_release()->set_commit_offset(it->second.Offset);
} else {
- result.mutable_destroy_partition_stream_request()->set_partition_stream_id(it->second.Partition.AssignId);
- result.mutable_destroy_partition_stream_request()->set_graceful(!kill);
- result.mutable_destroy_partition_stream_request()->set_committed_offset(it->second.Offset);
+ result.mutable_stop_partition_session_request()->set_partition_session_id(it->second.Partition.AssignId);
+ result.mutable_stop_partition_session_request()->set_graceful(!kill);
+ result.mutable_stop_partition_session_request()->set_committed_offset(it->second.Offset);
}
auto pp = it->second.Partition;
@@ -1478,10 +1476,18 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvRead::TPtr&
template<typename TServerMessage>
i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
- Y_VERIFY(resp.data_batch().partition_data_size() == 1);
+ constexpr bool UseMigrationProtocol = std::is_same_v<TServerMessage, PersQueue::V1::MigrationStreamingReadServerMessage>;
+ if constexpr (UseMigrationProtocol) {
+ Y_VERIFY(resp.data_batch().partition_data_size() == 1);
+ Response.mutable_data_batch()->add_partition_data()->Swap(resp.mutable_data_batch()->mutable_partition_data(0));
+
+ } else {
+ Y_VERIFY(resp.read_response().partition_data_size() == 1);
+ Response.mutable_read_response()->add_partition_data()->Swap(resp.mutable_read_response()->mutable_partition_data(0));
+ }
+
Response.set_status(Ydb::StatusIds::SUCCESS);
- Response.mutable_data_batch()->add_partition_data()->Swap(resp.mutable_data_batch()->mutable_partition_data(0));
i64 prev = Response.ByteSize();
std::swap<i64>(prev, ByteSize);
return ByteSize - prev;
@@ -1495,16 +1501,27 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadResponse::T
THolder<TEvReadResponse> event(ev->Release());
- Y_VERIFY(event->Response.data_batch().partition_data_size() == 1);
- const ui64 partitionCookie = event->Response.data_batch().partition_data(0).cookie().partition_cookie();
- Y_VERIFY(partitionCookie != 0); // cookie is assigned
- const ui64 assignId = event->Response.data_batch().partition_data(0).cookie().assign_id();
+ ui64 partitionCookie;
+ ui64 assignId;
+ if constexpr (UseMigrationProtocol) {
+ Y_VERIFY(event->Response.data_batch().partition_data_size() == 1);
+ partitionCookie = event->Response.data_batch().partition_data(0).cookie().partition_cookie();
+ Y_VERIFY(partitionCookie != 0); // cookie is assigned
+ assignId = event->Response.data_batch().partition_data(0).cookie().assign_id();
+
+ } else {
+ Y_VERIFY(event->Response.read_response().partition_data_size() == 1);
+ assignId = event->Response.read_response().partition_data(0).partition_session_id();
+ }
+
const auto partitionIt = Partitions.find(assignId);
Y_VERIFY(partitionIt != Partitions.end());
Y_VERIFY(partitionIt->second.Reading);
partitionIt->second.Reading = false;
- partitionIt->second.ReadIdToResponse = partitionCookie + 1;
+ if constexpr (UseMigrationProtocol) {
+ partitionIt->second.ReadIdToResponse = partitionCookie + 1;
+ }
auto it = PartitionToReadResponse.find(sender);
Y_VERIFY(it != PartitionToReadResponse.end());
@@ -1558,7 +1575,13 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext&
Y_VERIFY(formedResponse->RequestsInfly == 0);
const ui64 diff = formedResponse->Response.ByteSize();
- const bool hasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch());
+ bool hasMessages;
+ if constexpr(UseMigrationProtocol) {
+ hasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch());
+ } else {
+ hasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response());
+ }
+
if (hasMessages) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " response to read " << formedResponse->Guid);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 9fedc4d6627..93b846a20c2 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -359,7 +359,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
req.mutable_init_request()->set_consumer("user");
req.mutable_init_request()->set_read_only_original(true);
- req.mutable_init_request()->mutable_read_params()->set_max_read_messages_count(1);
if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
@@ -369,7 +368,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kInitResponse);
//send some reads
req.Clear();
- req.mutable_read_request();
+ req.mutable_read_request()->set_request_uncompressed_size(256);
for (ui32 i = 0; i < 10; ++i) {
if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
@@ -387,31 +386,31 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
TVector<i64> partition_ids;
//lock partition
UNIT_ASSERT(readStream->Read(&resp));
- UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kCreatePartitionStreamRequest);
- UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().topic() == "acc/topic1");
- UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().cluster() == "dc1");
- partition_ids.push_back(resp.create_partition_stream_request().partition_stream().partition_id());
+ UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStartPartitionSessionRequest);
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().topic() == "acc/topic1");
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().cluster() == "dc1");
+ partition_ids.push_back(resp.start_partition_session_request().partition_session().partition_id());
- assignId = resp.create_partition_stream_request().partition_stream().partition_stream_id();
+ assignId = resp.start_partition_session_request().partition_session().partition_session_id();
req.Clear();
- req.mutable_create_partition_stream_response()->set_partition_stream_id(assignId);
+ req.mutable_start_partition_session_response()->set_partition_session_id(assignId);
if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
}
resp.Clear();
UNIT_ASSERT(readStream->Read(&resp));
- UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kCreatePartitionStreamRequest);
- UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().topic() == "acc/topic1");
- UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().cluster() == "dc1");
- partition_ids.push_back(resp.create_partition_stream_request().partition_stream().partition_id());
+ UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStartPartitionSessionRequest);
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().topic() == "acc/topic1");
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().cluster() == "dc1");
+ partition_ids.push_back(resp.start_partition_session_request().partition_session().partition_id());
std::sort(partition_ids.begin(), partition_ids.end());
UNIT_ASSERT((partition_ids == TVector<i64>{0, 1}));
- assignId = resp.create_partition_stream_request().partition_stream().partition_stream_id();
+ assignId = resp.start_partition_session_request().partition_session().partition_session_id();
req.Clear();
- req.mutable_create_partition_stream_response()->set_partition_stream_id(assignId);
+ req.mutable_start_partition_session_response()->set_partition_session_id(assignId);
if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
@@ -433,7 +432,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
req.mutable_init_request()->set_consumer("user");
req.mutable_init_request()->set_read_only_original(true);
- req.mutable_init_request()->mutable_read_params()->set_max_read_messages_count(1);
if (!readStreamSecond->Write(req)) {
ythrow yexception() << "write fail";
@@ -453,25 +451,25 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
//lock partition
UNIT_ASSERT(readStream->Read(&resp));
Cerr << "=== Got response (expect destroy): " << resp.ShortDebugString() << Endl;
- UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kDestroyPartitionStreamRequest);
- UNIT_ASSERT(resp.destroy_partition_stream_request().graceful());
- auto stream_id = resp.destroy_partition_stream_request().partition_stream_id();
+ UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStopPartitionSessionRequest);
+ UNIT_ASSERT(resp.stop_partition_session_request().graceful());
+ auto stream_id = resp.stop_partition_session_request().partition_session_id();
req.Clear();
- req.mutable_destroy_partition_stream_response()->set_partition_stream_id(stream_id);
+ req.mutable_stop_partition_session_response()->set_partition_session_id(stream_id);
if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
}
resp.Clear();
UNIT_ASSERT(readStreamSecond->Read(&resp));
Cerr << "=== Got response (expect create): " << resp.ShortDebugString() << Endl;
- UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kCreatePartitionStreamRequest);
- UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().topic() == "acc/topic1");
- UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().cluster() == "dc1");
+ UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStartPartitionSessionRequest);
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().topic() == "acc/topic1");
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().cluster() == "dc1");
- assignId = resp.create_partition_stream_request().partition_stream().partition_stream_id();
+ assignId = resp.start_partition_session_request().partition_session().partition_session_id();
req.Clear();
- req.mutable_create_partition_stream_response()->set_partition_stream_id(assignId);
+ req.mutable_start_partition_session_response()->set_partition_session_id(assignId);
if (!readStreamSecond->Write(req)) {
ythrow yexception() << "write fail";
@@ -488,14 +486,14 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
//lock partition
UNIT_ASSERT(readStream->Read(&resp));
Cerr << "=== Got response (expect forceful destroy): " << resp.ShortDebugString() << Endl;
- UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kDestroyPartitionStreamRequest);
- UNIT_ASSERT(!resp.destroy_partition_stream_request().graceful());
+ UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStopPartitionSessionRequest);
+ UNIT_ASSERT(!resp.stop_partition_session_request().graceful());
resp.Clear();
UNIT_ASSERT(readStreamSecond->Read(&resp));
Cerr << "=== Got response (expect forceful destroy): " << resp.ShortDebugString() << Endl;
- UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kDestroyPartitionStreamRequest);
- UNIT_ASSERT(!resp.destroy_partition_stream_request().graceful());
+ UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStopPartitionSessionRequest);
+ UNIT_ASSERT(!resp.stop_partition_session_request().graceful());
}
}
@@ -520,7 +518,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
req.mutable_init_request()->set_consumer("user");
req.mutable_init_request()->set_read_only_original(true);
- req.mutable_init_request()->mutable_read_params()->set_max_read_messages_count(1);
if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
@@ -530,7 +527,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kInitResponse);
//send some reads
req.Clear();
- req.mutable_read_request();
+ req.mutable_read_request()->set_request_uncompressed_size(256);
for (ui32 i = 0; i < 10; ++i) {
if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
@@ -546,16 +543,16 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
//lock partition
UNIT_ASSERT(readStream->Read(&resp));
- UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kCreatePartitionStreamRequest);
- UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().topic() == "acc/topic1");
- UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().cluster() == "dc1");
- UNIT_ASSERT(resp.create_partition_stream_request().partition_stream().partition_id() == 0);
+ UNIT_ASSERT(resp.server_message_case() == StreamingReadServerMessage::kStartPartitionSessionRequest);
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().topic() == "acc/topic1");
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().cluster() == "dc1");
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().partition_id() == 0);
- assignId = resp.create_partition_stream_request().partition_stream().partition_stream_id();
+ assignId = resp.start_partition_session_request().partition_session().partition_session_id();
req.Clear();
- req.mutable_create_partition_stream_response()->set_partition_stream_id(assignId);
+ req.mutable_start_partition_session_response()->set_partition_session_id(assignId);
- req.mutable_create_partition_stream_response()->set_read_offset(10);
+ req.mutable_start_partition_session_response()->set_read_offset(10);
if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
}
@@ -575,14 +572,15 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
//check read results
StreamingReadServerMessage resp;
- for (ui32 i = 10; i < 16; ++i) {
+ for (ui32 i = 10; i < 16; ) {
UNIT_ASSERT(readStream->Read(&resp));
Cerr << "Got read response " << resp << "\n";
- UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kDataBatch, resp);
- UNIT_ASSERT(resp.data_batch().partition_data_size() == 1);
- UNIT_ASSERT(resp.data_batch().partition_data(0).batches_size() == 1);
- UNIT_ASSERT(resp.data_batch().partition_data(0).batches(0).message_data_size() == 1);
- UNIT_ASSERT(resp.data_batch().partition_data(0).batches(0).message_data(0).offset() == i);
+ UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kReadResponse, resp);
+ UNIT_ASSERT(resp.read_response().partition_data_size() == 1);
+ UNIT_ASSERT(resp.read_response().partition_data(0).batches_size() == 1);
+ UNIT_ASSERT(resp.read_response().partition_data(0).batches(0).message_data_size() >= 1);
+ UNIT_ASSERT(resp.read_response().partition_data(0).batches(0).message_data(0).offset() == i);
+ i += resp.read_response().partition_data(0).batches(0).message_data_size();
}
// send commit, await commitDone
@@ -591,7 +589,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
StreamingReadServerMessage resp;
auto commit = req.mutable_commit_request()->add_commits();
- commit->set_partition_stream_id(assignId);
+ commit->set_partition_session_id(assignId);
auto offsets = commit->add_offsets();
offsets->set_start_offset(0);
@@ -603,7 +601,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(readStream->Read(&resp));
UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kCommitResponse, resp);
UNIT_ASSERT(resp.commit_response().partitions_committed_offsets_size() == 1);
- UNIT_ASSERT(resp.commit_response().partitions_committed_offsets(0).partition_stream_id() == assignId);
+ UNIT_ASSERT(resp.commit_response().partitions_committed_offsets(0).partition_session_id() == assignId);
UNIT_ASSERT(resp.commit_response().partitions_committed_offsets(0).committed_offset() == 13);
}
@@ -612,17 +610,17 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
StreamingReadClientMessage req;
StreamingReadServerMessage resp;
- req.mutable_partition_stream_status_request()->set_partition_stream_id(assignId);
+ req.mutable_partition_session_status_request()->set_partition_session_id(assignId);
if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
}
UNIT_ASSERT(readStream->Read(&resp));
- UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kPartitionStreamStatusResponse, resp);
- UNIT_ASSERT(resp.partition_stream_status_response().partition_stream_id() == assignId);
- UNIT_ASSERT(resp.partition_stream_status_response().committed_offset() == 13);
- UNIT_ASSERT(resp.partition_stream_status_response().end_offset() == 16);
- UNIT_ASSERT(resp.partition_stream_status_response().written_at_watermark_ms() > 0);
+ UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kPartitionSessionStatusResponse, resp);
+ UNIT_ASSERT(resp.partition_session_status_response().partition_session_id() == assignId);
+ UNIT_ASSERT(resp.partition_session_status_response().committed_offset() == 13);
+ UNIT_ASSERT(resp.partition_session_status_response().end_offset() == 16);
+ UNIT_ASSERT(resp.partition_session_status_response().written_at_watermark_ms() > 0);
}
// send update token request, await response
@@ -956,8 +954,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
req.mutable_init_request()->set_consumer("user");
req.mutable_init_request()->set_read_only_original(true);
- req.mutable_init_request()->mutable_read_params()->set_max_read_messages_count(1000);
-
if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
}
@@ -971,7 +967,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Sleep(TDuration::Seconds(5));
for (ui32 i = 0; i < 10; ++i) {
req.Clear();
- req.mutable_read_request();
+ req.mutable_read_request()->set_request_uncompressed_size(256000);
if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
@@ -984,16 +980,16 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
for (ui32 i = 0; i < 2;) {
StreamingReadServerMessage resp;
UNIT_ASSERT(readStream->Read(&resp));
- if (resp.server_message_case() == StreamingReadServerMessage::kCreatePartitionStreamRequest) {
- auto assignId = resp.create_partition_stream_request().partition_stream().partition_stream_id();
+ if (resp.server_message_case() == StreamingReadServerMessage::kStartPartitionSessionRequest) {
+ auto assignId = resp.start_partition_session_request().partition_session().partition_session_id();
StreamingReadClientMessage req;
- req.mutable_create_partition_stream_response()->set_partition_stream_id(assignId);
+ req.mutable_start_partition_session_response()->set_partition_session_id(assignId);
UNIT_ASSERT(readStream->Write(req));
continue;
}
- UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kDataBatch, resp);
- i += resp.data_batch().partition_data_size();
+ UNIT_ASSERT_C(resp.server_message_case() == StreamingReadServerMessage::kReadResponse, resp);
+ i += resp.read_response().partition_data_size();
}
}
diff --git a/ydb/services/persqueue_v1/ut/pq_data_writer.h b/ydb/services/persqueue_v1/ut/pq_data_writer.h
index caef8940601..34e7f3bf94d 100644
--- a/ydb/services/persqueue_v1/ut/pq_data_writer.h
+++ b/ydb/services/persqueue_v1/ut/pq_data_writer.h
@@ -244,7 +244,7 @@ private:
req.mutable_init_request()->set_topic(topic);
req.mutable_init_request()->set_message_group_id(SourceId_);
- req.mutable_init_request()->set_max_supported_block_format_version(0);
+ req.mutable_init_request()->set_max_supported_format_version(0);
(*req.mutable_init_request()->mutable_session_meta())["key"] = "value";