diff options
author | ildar-khisam <[email protected]> | 2022-06-16 19:44:06 +0300 |
---|---|---|
committer | ildar-khisam <[email protected]> | 2022-06-16 19:44:06 +0300 |
commit | b4f35ae1d3583d06a2997ff4063c6613e1c7aaab (patch) | |
tree | 681a147c3a46a8b650a18ea46610c5dc51464bc9 | |
parent | a6f15ab7c05a0b9968f8095bef8663d3efdd9fd5 (diff) |
topic service public grpc api
fix control messages issues
ref:5ca10d7c0bffb0760bd3784e0b08cb8dd2e55ed4
-rw-r--r-- | ydb/public/api/grpc/draft/ydb_topic_v1.proto | 21 | ||||
-rw-r--r-- | ydb/public/api/protos/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_topic.proto | 305 |
3 files changed, 188 insertions, 139 deletions
diff --git a/ydb/public/api/grpc/draft/ydb_topic_v1.proto b/ydb/public/api/grpc/draft/ydb_topic_v1.proto index 23219ef6eff..948b8e36e67 100644 --- a/ydb/public/api/grpc/draft/ydb_topic_v1.proto +++ b/ydb/public/api/grpc/draft/ydb_topic_v1.proto @@ -64,26 +64,17 @@ service TopicService { rpc StreamRead(stream StreamReadMessage.FromClient) returns (stream StreamReadMessage.FromServer); - // Describe topic command. - rpc DescribeTopic(DescribeTopicRequest) returns (DescribeTopicResponse); - - - // Drop topic command. - rpc DropTopic(DropTopicRequest) returns (DropTopicResponse); - - // Create topic command. rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse); - // Alter topic command. - rpc AlterTopic(AlterTopicRequest) returns (AlterTopicResponse); + // Describe topic command. + rpc DescribeTopic(DescribeTopicRequest) returns (DescribeTopicResponse); - // Add consumer command. - rpc AddConsumer(AddConsumerRequest) returns (AddConsumerResponse); + // Alter topic command. + rpc AlterTopic(AlterTopicRequest) returns (AlterTopicResponse); - // Drop consumer command. - rpc DropConsumer(RemoveConsumerRequest) returns (RemoveConsumerResponse); -} + // Drop topic command. + rpc DropTopic(DropTopicRequest) returns (DropTopicResponse); diff --git a/ydb/public/api/protos/CMakeLists.txt b/ydb/public/api/protos/CMakeLists.txt index 0f093e35576..a573bf200a4 100644 --- a/ydb/public/api/protos/CMakeLists.txt +++ b/ydb/public/api/protos/CMakeLists.txt @@ -45,7 +45,6 @@ target_proto_messages(api-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_table.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_value.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_s3_internal.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_topic.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/yq.proto ) generate_enum_serilization(api-protos diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index 8d940a5d029..507d1222346 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -22,6 +22,14 @@ enum Codec { CODEC_LZOP = 3; CODEC_ZSTD = 4; CODEC_CUSTOM = 10000; + reserved 20000 to max; +} + +// Description of supported codecs. +message SupportedCodecs { + // List of supported codecs. + // See enum Codec above for values. + repeated int32 codecs = 1; } // Represents range [start, end). @@ -40,6 +48,11 @@ message UpdateTokenRequest { message UpdateTokenResponse { } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// StreamWrite + + // Messages for bidirectional streaming rpc StreamWrite message StreamWriteMessage { // Client-server message for write session. Contains one of: @@ -99,7 +112,8 @@ message StreamWriteMessage { // Identifier of partition that is matched for this write session. int64 partition_id = 3; - // Client can only use compression codecs from this set to write messages to topic, session will be closed with BAD_REQUEST otherwise. + // Client can only use compression codecs from this set to write messages to topic. + // Otherwise session will be closed with BAD_REQUEST. SupportedCodecs supported_codecs = 4; } @@ -109,7 +123,7 @@ message StreamWriteMessage { // Codec that is used for data compression. // See enum Codec above for values. - int64 codec = 2; + int32 codec = 2; message MessageData { // Message sequence number, provided by client for deduplication. @@ -131,7 +145,8 @@ message StreamWriteMessage { repeated WriteAck acks = 1; // Assigned partition for all client messages inside this batch. - // This actual partition may differ from that returned in InitResponse or other WriteResponses in this write session. + // This actual partition may differ from that returned in InitResponse + // or other WriteResponses in this write session. int64 partition_id = 2; // Write statistics for this sequence of client messages. @@ -154,13 +169,12 @@ message StreamWriteMessage { } message Skipped { + Reason reason = 1; + enum Reason { REASON_UNSPECIFIED = 0; REASON_ALREADY_WRITTEN = 1; } - - // See enum Reason above for values. - int64 reason = 1; } } @@ -180,6 +194,11 @@ message StreamWriteMessage { } } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// StreamRead + + // Within a StreamRead session delivered messages are separated by partition. // Reads from a single partition are represented by a partition session. message PartitionSession { @@ -191,7 +210,7 @@ message PartitionSession { int64 partition_id = 3; } -// Messages for bidirectional streaming rpc StreamingRead +// Messages for bidirectional streaming rpc StreamRead message StreamReadMessage { // Client-server message for read session. Contains one of: // InitRequest - handshake request. @@ -267,6 +286,7 @@ message StreamReadMessage { // Zero means infinite lag. google.protobuf.Duration max_lag = 3; // Read data only after this timestamp from this topic. + // Read only messages with 'written_at' value greater or equal than this timestamp. google.protobuf.Timestamp read_from = 4; } } @@ -280,19 +300,24 @@ message StreamReadMessage { // Message that represents client readiness for receiving more data. message ReadRequest { // Server and client each keep track of total bytes size of all ReadResponses. - // When client is ready to receive N more bytes in responses (to increment possible total by N), it sends a ReadRequest with bytes_size = N. - // - // Example: - // 1) Let client have 200 B buffer. It sends ReadRequest with bytes_size = 200; - // 2) Server returns ReadResponse with bytes_size = 150; now client buffer has 50 free bytes, server is free to send up to 50 bytes in responses. - // 3) Client processes 100 bytes from buffer, now buffer free space is 150 bytes. Client sends ReadRequest with bytes_size = 100; - // 4) Server is free to send up to 50 + 100 = 150 bytes. It may, for example, - // send one 70 B response and another 80 B response without receiving additional requests. - // + // When client is ready to receive N more bytes in responses (to increment possible total by N), + // it sends a ReadRequest with bytes_size = N. + // bytes_size value must be positive. // So in expression 'A = (sum of bytes_size in all ReadRequests) - (sum of bytes_size in all ReadResponses)' // server will keep A (available size for responses) non-negative. // But there is an exception. If server receives ReadRequest, and the first message in response exceeds A - - // then it will still be delivered, and A will become negative until client sends enough additional ReadRequests. + // then it will still be delivered, and A will become negative until enough additional ReadRequests. + // + // Example: + // 1) Let client have 200 bytes buffer. It sends ReadRequest with bytes_size = 200; + // 2) Server may return one ReadResponse with bytes_size = 70 and than another 80 bytes response; + // now client buffer has 50 free bytes, server is free to send up to 50 bytes in responses. + // 3) Client processes 100 bytes from buffer, now buffer free space is 150 bytes, + // so client sends ReadRequest with bytes_size = 100; + // 4) Server is free to send up to 50 + 100 = 150 bytes. But the next read message is too big, + // and it sends 160 bytes ReadResponse. + // 5) Let's assume client somehow processes it, and its 200 bytes buffer is free again. + // It shoud account for excess 10 bytes and send ReadRequest with bytes_size = 210. int64 bytes_size = 1; } @@ -328,7 +353,7 @@ message StreamReadMessage { // Codec that is used for data compression. // See enum Codec above for values. - int64 codec = 4; + int32 codec = 4; // Persist timestamp on server for batch. google.protobuf.Timestamp written_at = 5; @@ -390,8 +415,7 @@ message StreamReadMessage { // Partition contains messages with offsets in range [start, end). OffsetsRange partition_offsets = 2; - // Upper bound for committed offsets. - // TODO example (here or in CommitRequest) + // Each offset up to and including (committed_offset - 1) was committed. int64 committed_offset = 3; // Write timestamp of next message written to this partition will be no less than write_time_high_watermark. @@ -399,13 +423,12 @@ message StreamReadMessage { } // Command from server to create and start a partition session. - // Client must react on this command by sending StartPartitionSessionResponse when ready to recieve data from this partition. + // Client must respond with StartPartitionSessionResponse when ready to receive data from this partition. message StartPartitionSessionRequest { // Partition session description. PartitionSession partition_session = 1; - // Actual upper bound for all committed offsets. - // I.e. each offset up to and including (committed_offset - 1) was committed. + // Each offset up to and including (committed_offset - 1) was committed. int64 committed_offset = 2; // Partition contains messages with offsets in range [start, end). @@ -419,17 +442,16 @@ message StreamReadMessage { // Reads in this partition session will start from offset no less than read_offset. // If read_offset is set, server will check that read_offset is no less that actual committed offset. - // If check will fail then server will send a message describing error in status and issues fields, then close stream. + // If check fails then server will send an error message (status != SUCCESS) and close stream. // // If read_offset is not set, no check will be made. // InitRequest.max_lag and InitRequest.read_from could lead to skip of more messages. // Server will return data starting from offset that is maximum of actual committed offset, read_offset (if set) // and offsets calculated from InitRequest.max_lag and InitRequest.read_from. -// optional int64 read_offset = 2; - google.protobuf.Int64Value read_offset = 2; - // All messages with offset less than commit_offset are processed by client. Server will commit this position if this is not done yet. - //optional int64 commit_offset = 3; - google.protobuf.Int64Value commit_offset = 3; + optional int64 read_offset = 2; + // All messages with offset less than commit_offset are processed by client. + // Server will commit this position if this is not done yet. + optional int64 commit_offset = 3; } // Command from server to stop and destroy concrete partition session. @@ -438,18 +460,19 @@ message StreamReadMessage { int64 partition_session_id = 1; // Flag of graceful stop. - // If True then server is waiting for response from client before giving of this partition for other read session. + // If set, server will wait for response from client before giving this partition to other read session. // Server will not send more data from this partition. // Client can process all received data and wait for commit and only after send response. // If False then server gives partition for other session right now. - // All further commits for this PartitionSession has no effect. Server is not waiting for response. + // All further commits for this partition session has no effect. Server is not waiting for response. bool graceful = 2; // Upper bound for committed offsets. int64 committed_offset = 3; } - // Signal for server that client finished working with this partition. Must be sent only after corresponding StopPartitionSessionRequest from server. + // Signal for server that client finished working with this partition. + // Must be sent only after corresponding StopPartitionSessionRequest from server. // Server will give this partition to other read session only after StopPartitionSessionResponse signal. message StopPartitionSessionResponse { // Partition session identifier of partition session that is released by client. @@ -458,30 +481,9 @@ message StreamReadMessage { } -// Drop topic request sent from client to server. -message DropTopicRequest { - Ydb.Operations.OperationParams operation_params = 1; - // Topic path. - string path = 2; -} - +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Control messages -// Drop topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR". -message DropTopicResponse { - // Result of request will be inside operation. - Ydb.Operations.Operation operation = 1; -} - -// Drop topic result message that will be inside DropTopicResponse.operation. -message DropTopicResult { -} - -// Description of supported codecs. -message SupportedCodecs { - // List of supported codecs. - // See enum Codec above for values. - repeated int32 codecs = 1; -} // Consumer description. message Consumer { @@ -502,7 +504,6 @@ message Consumer { map<string, string> attributes = 6; } - // Consumer alter description. message AlterConsumer { // Must have valid not empty name as a key. @@ -510,8 +511,7 @@ message AlterConsumer { // Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention. // User should take care that such consumer never stalls, to prevent running out of disk space. // Flag that this consumer is important. -// optional bool important = 2; - google.protobuf.BoolValue important = 2; + optional bool important = 2; // All messages with smaller server written_at timestamp will be skipped. google.protobuf.Timestamp read_from = 3; reserved 4; // supported_format @@ -525,47 +525,50 @@ message AlterConsumer { } -// Message for describing topic internals. -message TopicSettings { - oneof partitions { +// Create topic request sent from client to server. +message CreateTopicRequest { + Ydb.Operations.OperationParams operation_params = 1; + // Topic path. + string path = 2; + + // Topic settings. + oneof partitioning { + // Uniform partitioning case. // How many uniform partitions in topic. Must less than database limit. Default limit - 10. - int32 partitions_count = 1; - // 2 reserved for partition_at_keys + int64 partitions_count = 3; + // 4 reserved for partition_at_keys } + reserved 4; // partition_at_keys inside partitions oneof + + // Retention settings. + // Currently, only one limit may be set, so other should not be set. + // How long data in partition should be stored. Must be greater than 0 and less than limit for this database. // Default limit - 36 hours. - google.protobuf.Duration retention_period = 3; - // How much data in partition should be stored. Must be greater than 0 and less than limit for this database. Zero means infinite limit. - int64 retention_storage_mb = 4; - reserved 5; // supported_format. + google.protobuf.Duration retention_period = 5; + // How much data in partition should be stored. Must be greater than 0 and less than limit for this database. + // Zero value means infinite limit. + int64 retention_storage_mb = 6; + reserved 7; // supported_format. // List of allowed codecs for writers. // Writes with codec not from this list are forbidden. - SupportedCodecs supported_codecs = 6; - // Partition write speed in bytes per second. Must be less than database limit. Default limit - 1 MB/s - used when value is zero. - int64 partition_write_speed_bytes_per_second = 7; - // Burst size for write in partition, in bytes. Must be less than database limit. Default limit - 1 MB - used when value is zero. - int64 partition_write_burst_bytes = 8; + SupportedCodecs supported_codecs = 8; + // Partition write speed in bytes per second. Must be less than database limit. + // Zero value means default limit: 1 MB per second. + int64 partition_write_speed_bytes_per_second = 9; + // Burst size for write in partition, in bytes. Must be less than database limit. + // Zero value means default limit: 1 MB. + int64 partition_write_burst_bytes = 10; // User and server attributes of topic. Server attributes starts from "_" and will be validated by server. - map<string, string> attributes = 9; + map<string, string> attributes = 11; // List of consumers for this topic. - repeated Consumer consumers = 10; - -} - -// Create topic request sent from client to server. -message CreateTopicRequest { - Ydb.Operations.OperationParams operation_params = 1; - // Topic path. - string path = 2; - - // Topic settings. - TopicSettings settings = 3; + repeated Consumer consumers = 12; } - -// Create topic response sent from server to client. If topic is already exists then response status will be "ALREADY_EXISTS". +// Create topic response sent from server to client. +// If topic is already exists then response status will be "ALREADY_EXISTS". message CreateTopicResponse { // Result of request will be inside operation. Ydb.Operations.Operation operation = 1; @@ -575,78 +578,134 @@ message CreateTopicResponse { message CreateTopicResult { } -// Update existing topic request sent from client to server. -message AlterTopicRequest { + +// Describe topic request sent from client to server. +message DescribeTopicRequest { Ydb.Operations.OperationParams operation_params = 1; + // Topic path. string path = 2; - // New topic settings to be set. All options inside should be set despite same value. - TopicSettings settings = 4; } -// Update topic response sent from server to client. -message AlterTopicResponse { +// Describe topic response sent from server to client. +// If topic is not existed then response status will be "SCHEME_ERROR". +message DescribeTopicResponse { // Result of request will be inside operation. Ydb.Operations.Operation operation = 1; } -// Update topic result message that will be inside UpdateTopicResponse.operation. -message AlterTopicResult { - oneof partitions { +// Describe topic result message that will be inside DescribeTopicResponse.operation. +message DescribeTopicResult { + // Description of scheme object. + Ydb.Scheme.Entry self = 1; + + // Settings of topic. + + // Partitioning description. + oneof partitioning { + // Case of 'partitions_count' uniform partitions in topic. + int64 partitions_count = 2; + // 3 reserved for partition_at_keys + } + reserved 3; // partition_at_keys inside partitions oneof + + // Retention settings. + // Currently, only one limit may be set, so other should not be set. + + // How long data in partition should be stored. + google.protobuf.Duration retention_period = 4; + // How much data in partition should be stored. + // Zero value means infinite limit. + int64 retention_storage_mb = 5; + + reserved 6; // supported_format. + // List of allowed codecs for writers. + // Writes with codec not from this list are forbidden. + SupportedCodecs supported_codecs = 7; + // Partition write speed in bytes per second. + // Zero value means default limit: 1 MB per second. + int64 partition_write_speed_bytes_per_second = 8; + // Burst size for write in partition, in bytes. + // Zero value means default limit: 1 MB. + int64 partition_write_burst_bytes = 9; + + // User and server attributes of topic. Server attributes starts from "_" and will be validated by server. + map<string, string> attributes = 10; + + // List of consumers for this topic. + repeated Consumer consumers = 11; +} + + +// Update existing topic request sent from client to server. +message AlterTopicRequest { + Ydb.Operations.OperationParams operation_params = 1; + // Topic path. + string path = 2; + + // Partitioning setting. + oneof partitioning { // How many uniform partitions in topic. Must less than database limit. Default limit - 10. - int32 partitions_count = 1; - // 2 reserved for partition_at_keys + int64 partitions_count = 3; + // 4 reserved for partition_at_keys } + reserved 4; // partition_at_keys inside partitions oneof + + // Retention settings. + // Currently, only one limit may be set, so other should not be set. + // How long data in partition should be stored. Must be greater than 0 and less than limit for this database. // Default limit - 36 hours. - google.protobuf.Duration retention_period = 3; + google.protobuf.Duration retention_period = 5; // How much data in partition should be stored. Must be greater than 0 and less than limit for this database. -// optional int64 retention_storage_mb = 4; - google.protobuf.Int64Value retention_storage_mb = 4; - reserved 5; // supported_format. + optional int64 retention_storage_mb = 6; + reserved 7; // supported_format. // List of allowed codecs for writers. - // See enum Codec above for values. // Writes with codec not from this list are forbidden. - SupportedCodecs supported_codecs = 6; + SupportedCodecs supported_codecs = 8; // Partition write speed in bytes per second. Must be less than database limit. Default limit - 1 MB/s. -// optional int64 partition_write_speed_bytes_per_second = 7; - google.protobuf.Int64Value partition_write_speed_bytes_per_second = 7; + optional int64 partition_write_speed_bytes_per_second = 9; // Burst size for write in partition, in bytes. Must be less than database limit. Default limit - 1 MB. -// optional int64 partition_write_burst_bytes = 8; - google.protobuf.Int64Value partition_write_burst_bytes = 8; + optional int64 partition_write_burst_bytes = 10; // User and server attributes of topic. Server attributes starts from "_" and will be validated by server. // Leave the value blank to drop an attribute. - map<string, string> alter_attributes = 9; + map<string, string> alter_attributes = 11; // Add consumers. - repeated Consumer add_consumers = 10; + repeated Consumer add_consumers = 12; // Remove consumers (by its names) - repeated string drop_consumers = 11; + repeated string drop_consumers = 13; // Alter consumers - repeated AlterConsumer alter_consumers = 12; + repeated AlterConsumer alter_consumers = 14; } +// Update topic response sent from server to client. +message AlterTopicResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +// Update topic result message that will be inside UpdateTopicResponse.operation. +message AlterTopicResult { +} -// Describe topic request sent from client to server. -message DescribeTopicRequest { - Ydb.Operations.OperationParams operation_params = 1; +// Drop topic request sent from client to server. +message DropTopicRequest { + Ydb.Operations.OperationParams operation_params = 1; // Topic path. string path = 2; } -// Describe topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR". -message DescribeTopicResponse { +// Drop topic response sent from server to client. +// If topic not exists then response status will be "SCHEME_ERROR". +message DropTopicResponse { // Result of request will be inside operation. Ydb.Operations.Operation operation = 1; } -// Describe topic result message that will be inside DescribeTopicResponse.operation. -message DescribeTopicResult { - // Description of scheme object. - Ydb.Scheme.Entry self = 1; - // Settings of topic. - TopicSettings settings = 2; +// Drop topic result message that will be inside DropTopicResponse.operation. +message DropTopicResult { } |