summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <[email protected]>2022-06-16 19:44:06 +0300
committerildar-khisam <[email protected]>2022-06-16 19:44:06 +0300
commitb4f35ae1d3583d06a2997ff4063c6613e1c7aaab (patch)
tree681a147c3a46a8b650a18ea46610c5dc51464bc9
parenta6f15ab7c05a0b9968f8095bef8663d3efdd9fd5 (diff)
topic service public grpc api
fix control messages issues ref:5ca10d7c0bffb0760bd3784e0b08cb8dd2e55ed4
-rw-r--r--ydb/public/api/grpc/draft/ydb_topic_v1.proto21
-rw-r--r--ydb/public/api/protos/CMakeLists.txt1
-rw-r--r--ydb/public/api/protos/ydb_topic.proto305
3 files changed, 188 insertions, 139 deletions
diff --git a/ydb/public/api/grpc/draft/ydb_topic_v1.proto b/ydb/public/api/grpc/draft/ydb_topic_v1.proto
index 23219ef6eff..948b8e36e67 100644
--- a/ydb/public/api/grpc/draft/ydb_topic_v1.proto
+++ b/ydb/public/api/grpc/draft/ydb_topic_v1.proto
@@ -64,26 +64,17 @@ service TopicService {
rpc StreamRead(stream StreamReadMessage.FromClient) returns (stream StreamReadMessage.FromServer);
- // Describe topic command.
- rpc DescribeTopic(DescribeTopicRequest) returns (DescribeTopicResponse);
-
-
- // Drop topic command.
- rpc DropTopic(DropTopicRequest) returns (DropTopicResponse);
-
-
// Create topic command.
rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse);
- // Alter topic command.
- rpc AlterTopic(AlterTopicRequest) returns (AlterTopicResponse);
+ // Describe topic command.
+ rpc DescribeTopic(DescribeTopicRequest) returns (DescribeTopicResponse);
- // Add consumer command.
- rpc AddConsumer(AddConsumerRequest) returns (AddConsumerResponse);
+ // Alter topic command.
+ rpc AlterTopic(AlterTopicRequest) returns (AlterTopicResponse);
- // Drop consumer command.
- rpc DropConsumer(RemoveConsumerRequest) returns (RemoveConsumerResponse);
-}
+ // Drop topic command.
+ rpc DropTopic(DropTopicRequest) returns (DropTopicResponse);
diff --git a/ydb/public/api/protos/CMakeLists.txt b/ydb/public/api/protos/CMakeLists.txt
index 0f093e35576..a573bf200a4 100644
--- a/ydb/public/api/protos/CMakeLists.txt
+++ b/ydb/public/api/protos/CMakeLists.txt
@@ -45,7 +45,6 @@ target_proto_messages(api-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_table.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_value.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_s3_internal.proto
- ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_topic.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/yq.proto
)
generate_enum_serilization(api-protos
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index 8d940a5d029..507d1222346 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -22,6 +22,14 @@ enum Codec {
CODEC_LZOP = 3;
CODEC_ZSTD = 4;
CODEC_CUSTOM = 10000;
+ reserved 20000 to max;
+}
+
+// Description of supported codecs.
+message SupportedCodecs {
+ // List of supported codecs.
+ // See enum Codec above for values.
+ repeated int32 codecs = 1;
}
// Represents range [start, end).
@@ -40,6 +48,11 @@ message UpdateTokenRequest {
message UpdateTokenResponse {
}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// StreamWrite
+
+
// Messages for bidirectional streaming rpc StreamWrite
message StreamWriteMessage {
// Client-server message for write session. Contains one of:
@@ -99,7 +112,8 @@ message StreamWriteMessage {
// Identifier of partition that is matched for this write session.
int64 partition_id = 3;
- // Client can only use compression codecs from this set to write messages to topic, session will be closed with BAD_REQUEST otherwise.
+ // Client can only use compression codecs from this set to write messages to topic.
+ // Otherwise session will be closed with BAD_REQUEST.
SupportedCodecs supported_codecs = 4;
}
@@ -109,7 +123,7 @@ message StreamWriteMessage {
// Codec that is used for data compression.
// See enum Codec above for values.
- int64 codec = 2;
+ int32 codec = 2;
message MessageData {
// Message sequence number, provided by client for deduplication.
@@ -131,7 +145,8 @@ message StreamWriteMessage {
repeated WriteAck acks = 1;
// Assigned partition for all client messages inside this batch.
- // This actual partition may differ from that returned in InitResponse or other WriteResponses in this write session.
+ // This actual partition may differ from that returned in InitResponse
+ // or other WriteResponses in this write session.
int64 partition_id = 2;
// Write statistics for this sequence of client messages.
@@ -154,13 +169,12 @@ message StreamWriteMessage {
}
message Skipped {
+ Reason reason = 1;
+
enum Reason {
REASON_UNSPECIFIED = 0;
REASON_ALREADY_WRITTEN = 1;
}
-
- // See enum Reason above for values.
- int64 reason = 1;
}
}
@@ -180,6 +194,11 @@ message StreamWriteMessage {
}
}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// StreamRead
+
+
// Within a StreamRead session delivered messages are separated by partition.
// Reads from a single partition are represented by a partition session.
message PartitionSession {
@@ -191,7 +210,7 @@ message PartitionSession {
int64 partition_id = 3;
}
-// Messages for bidirectional streaming rpc StreamingRead
+// Messages for bidirectional streaming rpc StreamRead
message StreamReadMessage {
// Client-server message for read session. Contains one of:
// InitRequest - handshake request.
@@ -267,6 +286,7 @@ message StreamReadMessage {
// Zero means infinite lag.
google.protobuf.Duration max_lag = 3;
// Read data only after this timestamp from this topic.
+ // Read only messages with 'written_at' value greater or equal than this timestamp.
google.protobuf.Timestamp read_from = 4;
}
}
@@ -280,19 +300,24 @@ message StreamReadMessage {
// Message that represents client readiness for receiving more data.
message ReadRequest {
// Server and client each keep track of total bytes size of all ReadResponses.
- // When client is ready to receive N more bytes in responses (to increment possible total by N), it sends a ReadRequest with bytes_size = N.
- //
- // Example:
- // 1) Let client have 200 B buffer. It sends ReadRequest with bytes_size = 200;
- // 2) Server returns ReadResponse with bytes_size = 150; now client buffer has 50 free bytes, server is free to send up to 50 bytes in responses.
- // 3) Client processes 100 bytes from buffer, now buffer free space is 150 bytes. Client sends ReadRequest with bytes_size = 100;
- // 4) Server is free to send up to 50 + 100 = 150 bytes. It may, for example,
- // send one 70 B response and another 80 B response without receiving additional requests.
- //
+ // When client is ready to receive N more bytes in responses (to increment possible total by N),
+ // it sends a ReadRequest with bytes_size = N.
+ // bytes_size value must be positive.
// So in expression 'A = (sum of bytes_size in all ReadRequests) - (sum of bytes_size in all ReadResponses)'
// server will keep A (available size for responses) non-negative.
// But there is an exception. If server receives ReadRequest, and the first message in response exceeds A -
- // then it will still be delivered, and A will become negative until client sends enough additional ReadRequests.
+ // then it will still be delivered, and A will become negative until enough additional ReadRequests.
+ //
+ // Example:
+ // 1) Let client have 200 bytes buffer. It sends ReadRequest with bytes_size = 200;
+ // 2) Server may return one ReadResponse with bytes_size = 70 and than another 80 bytes response;
+ // now client buffer has 50 free bytes, server is free to send up to 50 bytes in responses.
+ // 3) Client processes 100 bytes from buffer, now buffer free space is 150 bytes,
+ // so client sends ReadRequest with bytes_size = 100;
+ // 4) Server is free to send up to 50 + 100 = 150 bytes. But the next read message is too big,
+ // and it sends 160 bytes ReadResponse.
+ // 5) Let's assume client somehow processes it, and its 200 bytes buffer is free again.
+ // It shoud account for excess 10 bytes and send ReadRequest with bytes_size = 210.
int64 bytes_size = 1;
}
@@ -328,7 +353,7 @@ message StreamReadMessage {
// Codec that is used for data compression.
// See enum Codec above for values.
- int64 codec = 4;
+ int32 codec = 4;
// Persist timestamp on server for batch.
google.protobuf.Timestamp written_at = 5;
@@ -390,8 +415,7 @@ message StreamReadMessage {
// Partition contains messages with offsets in range [start, end).
OffsetsRange partition_offsets = 2;
- // Upper bound for committed offsets.
- // TODO example (here or in CommitRequest)
+ // Each offset up to and including (committed_offset - 1) was committed.
int64 committed_offset = 3;
// Write timestamp of next message written to this partition will be no less than write_time_high_watermark.
@@ -399,13 +423,12 @@ message StreamReadMessage {
}
// Command from server to create and start a partition session.
- // Client must react on this command by sending StartPartitionSessionResponse when ready to recieve data from this partition.
+ // Client must respond with StartPartitionSessionResponse when ready to receive data from this partition.
message StartPartitionSessionRequest {
// Partition session description.
PartitionSession partition_session = 1;
- // Actual upper bound for all committed offsets.
- // I.e. each offset up to and including (committed_offset - 1) was committed.
+ // Each offset up to and including (committed_offset - 1) was committed.
int64 committed_offset = 2;
// Partition contains messages with offsets in range [start, end).
@@ -419,17 +442,16 @@ message StreamReadMessage {
// Reads in this partition session will start from offset no less than read_offset.
// If read_offset is set, server will check that read_offset is no less that actual committed offset.
- // If check will fail then server will send a message describing error in status and issues fields, then close stream.
+ // If check fails then server will send an error message (status != SUCCESS) and close stream.
//
// If read_offset is not set, no check will be made.
// InitRequest.max_lag and InitRequest.read_from could lead to skip of more messages.
// Server will return data starting from offset that is maximum of actual committed offset, read_offset (if set)
// and offsets calculated from InitRequest.max_lag and InitRequest.read_from.
-// optional int64 read_offset = 2;
- google.protobuf.Int64Value read_offset = 2;
- // All messages with offset less than commit_offset are processed by client. Server will commit this position if this is not done yet.
- //optional int64 commit_offset = 3;
- google.protobuf.Int64Value commit_offset = 3;
+ optional int64 read_offset = 2;
+ // All messages with offset less than commit_offset are processed by client.
+ // Server will commit this position if this is not done yet.
+ optional int64 commit_offset = 3;
}
// Command from server to stop and destroy concrete partition session.
@@ -438,18 +460,19 @@ message StreamReadMessage {
int64 partition_session_id = 1;
// Flag of graceful stop.
- // If True then server is waiting for response from client before giving of this partition for other read session.
+ // If set, server will wait for response from client before giving this partition to other read session.
// Server will not send more data from this partition.
// Client can process all received data and wait for commit and only after send response.
// If False then server gives partition for other session right now.
- // All further commits for this PartitionSession has no effect. Server is not waiting for response.
+ // All further commits for this partition session has no effect. Server is not waiting for response.
bool graceful = 2;
// Upper bound for committed offsets.
int64 committed_offset = 3;
}
- // Signal for server that client finished working with this partition. Must be sent only after corresponding StopPartitionSessionRequest from server.
+ // Signal for server that client finished working with this partition.
+ // Must be sent only after corresponding StopPartitionSessionRequest from server.
// Server will give this partition to other read session only after StopPartitionSessionResponse signal.
message StopPartitionSessionResponse {
// Partition session identifier of partition session that is released by client.
@@ -458,30 +481,9 @@ message StreamReadMessage {
}
-// Drop topic request sent from client to server.
-message DropTopicRequest {
- Ydb.Operations.OperationParams operation_params = 1;
- // Topic path.
- string path = 2;
-}
-
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// Control messages
-// Drop topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR".
-message DropTopicResponse {
- // Result of request will be inside operation.
- Ydb.Operations.Operation operation = 1;
-}
-
-// Drop topic result message that will be inside DropTopicResponse.operation.
-message DropTopicResult {
-}
-
-// Description of supported codecs.
-message SupportedCodecs {
- // List of supported codecs.
- // See enum Codec above for values.
- repeated int32 codecs = 1;
-}
// Consumer description.
message Consumer {
@@ -502,7 +504,6 @@ message Consumer {
map<string, string> attributes = 6;
}
-
// Consumer alter description.
message AlterConsumer {
// Must have valid not empty name as a key.
@@ -510,8 +511,7 @@ message AlterConsumer {
// Consumer may be marked as 'important'. It means messages for this consumer will never expire due to retention.
// User should take care that such consumer never stalls, to prevent running out of disk space.
// Flag that this consumer is important.
-// optional bool important = 2;
- google.protobuf.BoolValue important = 2;
+ optional bool important = 2;
// All messages with smaller server written_at timestamp will be skipped.
google.protobuf.Timestamp read_from = 3;
reserved 4; // supported_format
@@ -525,47 +525,50 @@ message AlterConsumer {
}
-// Message for describing topic internals.
-message TopicSettings {
- oneof partitions {
+// Create topic request sent from client to server.
+message CreateTopicRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ // Topic path.
+ string path = 2;
+
+ // Topic settings.
+ oneof partitioning {
+ // Uniform partitioning case.
// How many uniform partitions in topic. Must less than database limit. Default limit - 10.
- int32 partitions_count = 1;
- // 2 reserved for partition_at_keys
+ int64 partitions_count = 3;
+ // 4 reserved for partition_at_keys
}
+ reserved 4; // partition_at_keys inside partitions oneof
+
+ // Retention settings.
+ // Currently, only one limit may be set, so other should not be set.
+
// How long data in partition should be stored. Must be greater than 0 and less than limit for this database.
// Default limit - 36 hours.
- google.protobuf.Duration retention_period = 3;
- // How much data in partition should be stored. Must be greater than 0 and less than limit for this database. Zero means infinite limit.
- int64 retention_storage_mb = 4;
- reserved 5; // supported_format.
+ google.protobuf.Duration retention_period = 5;
+ // How much data in partition should be stored. Must be greater than 0 and less than limit for this database.
+ // Zero value means infinite limit.
+ int64 retention_storage_mb = 6;
+ reserved 7; // supported_format.
// List of allowed codecs for writers.
// Writes with codec not from this list are forbidden.
- SupportedCodecs supported_codecs = 6;
- // Partition write speed in bytes per second. Must be less than database limit. Default limit - 1 MB/s - used when value is zero.
- int64 partition_write_speed_bytes_per_second = 7;
- // Burst size for write in partition, in bytes. Must be less than database limit. Default limit - 1 MB - used when value is zero.
- int64 partition_write_burst_bytes = 8;
+ SupportedCodecs supported_codecs = 8;
+ // Partition write speed in bytes per second. Must be less than database limit.
+ // Zero value means default limit: 1 MB per second.
+ int64 partition_write_speed_bytes_per_second = 9;
+ // Burst size for write in partition, in bytes. Must be less than database limit.
+ // Zero value means default limit: 1 MB.
+ int64 partition_write_burst_bytes = 10;
// User and server attributes of topic. Server attributes starts from "_" and will be validated by server.
- map<string, string> attributes = 9;
+ map<string, string> attributes = 11;
// List of consumers for this topic.
- repeated Consumer consumers = 10;
-
-}
-
-// Create topic request sent from client to server.
-message CreateTopicRequest {
- Ydb.Operations.OperationParams operation_params = 1;
- // Topic path.
- string path = 2;
-
- // Topic settings.
- TopicSettings settings = 3;
+ repeated Consumer consumers = 12;
}
-
-// Create topic response sent from server to client. If topic is already exists then response status will be "ALREADY_EXISTS".
+// Create topic response sent from server to client.
+// If topic is already exists then response status will be "ALREADY_EXISTS".
message CreateTopicResponse {
// Result of request will be inside operation.
Ydb.Operations.Operation operation = 1;
@@ -575,78 +578,134 @@ message CreateTopicResponse {
message CreateTopicResult {
}
-// Update existing topic request sent from client to server.
-message AlterTopicRequest {
+
+// Describe topic request sent from client to server.
+message DescribeTopicRequest {
Ydb.Operations.OperationParams operation_params = 1;
+
// Topic path.
string path = 2;
- // New topic settings to be set. All options inside should be set despite same value.
- TopicSettings settings = 4;
}
-// Update topic response sent from server to client.
-message AlterTopicResponse {
+// Describe topic response sent from server to client.
+// If topic is not existed then response status will be "SCHEME_ERROR".
+message DescribeTopicResponse {
// Result of request will be inside operation.
Ydb.Operations.Operation operation = 1;
}
-// Update topic result message that will be inside UpdateTopicResponse.operation.
-message AlterTopicResult {
- oneof partitions {
+// Describe topic result message that will be inside DescribeTopicResponse.operation.
+message DescribeTopicResult {
+ // Description of scheme object.
+ Ydb.Scheme.Entry self = 1;
+
+ // Settings of topic.
+
+ // Partitioning description.
+ oneof partitioning {
+ // Case of 'partitions_count' uniform partitions in topic.
+ int64 partitions_count = 2;
+ // 3 reserved for partition_at_keys
+ }
+ reserved 3; // partition_at_keys inside partitions oneof
+
+ // Retention settings.
+ // Currently, only one limit may be set, so other should not be set.
+
+ // How long data in partition should be stored.
+ google.protobuf.Duration retention_period = 4;
+ // How much data in partition should be stored.
+ // Zero value means infinite limit.
+ int64 retention_storage_mb = 5;
+
+ reserved 6; // supported_format.
+ // List of allowed codecs for writers.
+ // Writes with codec not from this list are forbidden.
+ SupportedCodecs supported_codecs = 7;
+ // Partition write speed in bytes per second.
+ // Zero value means default limit: 1 MB per second.
+ int64 partition_write_speed_bytes_per_second = 8;
+ // Burst size for write in partition, in bytes.
+ // Zero value means default limit: 1 MB.
+ int64 partition_write_burst_bytes = 9;
+
+ // User and server attributes of topic. Server attributes starts from "_" and will be validated by server.
+ map<string, string> attributes = 10;
+
+ // List of consumers for this topic.
+ repeated Consumer consumers = 11;
+}
+
+
+// Update existing topic request sent from client to server.
+message AlterTopicRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ // Topic path.
+ string path = 2;
+
+ // Partitioning setting.
+ oneof partitioning {
// How many uniform partitions in topic. Must less than database limit. Default limit - 10.
- int32 partitions_count = 1;
- // 2 reserved for partition_at_keys
+ int64 partitions_count = 3;
+ // 4 reserved for partition_at_keys
}
+ reserved 4; // partition_at_keys inside partitions oneof
+
+ // Retention settings.
+ // Currently, only one limit may be set, so other should not be set.
+
// How long data in partition should be stored. Must be greater than 0 and less than limit for this database.
// Default limit - 36 hours.
- google.protobuf.Duration retention_period = 3;
+ google.protobuf.Duration retention_period = 5;
// How much data in partition should be stored. Must be greater than 0 and less than limit for this database.
-// optional int64 retention_storage_mb = 4;
- google.protobuf.Int64Value retention_storage_mb = 4;
- reserved 5; // supported_format.
+ optional int64 retention_storage_mb = 6;
+ reserved 7; // supported_format.
// List of allowed codecs for writers.
- // See enum Codec above for values.
// Writes with codec not from this list are forbidden.
- SupportedCodecs supported_codecs = 6;
+ SupportedCodecs supported_codecs = 8;
// Partition write speed in bytes per second. Must be less than database limit. Default limit - 1 MB/s.
-// optional int64 partition_write_speed_bytes_per_second = 7;
- google.protobuf.Int64Value partition_write_speed_bytes_per_second = 7;
+ optional int64 partition_write_speed_bytes_per_second = 9;
// Burst size for write in partition, in bytes. Must be less than database limit. Default limit - 1 MB.
-// optional int64 partition_write_burst_bytes = 8;
- google.protobuf.Int64Value partition_write_burst_bytes = 8;
+ optional int64 partition_write_burst_bytes = 10;
// User and server attributes of topic. Server attributes starts from "_" and will be validated by server.
// Leave the value blank to drop an attribute.
- map<string, string> alter_attributes = 9;
+ map<string, string> alter_attributes = 11;
// Add consumers.
- repeated Consumer add_consumers = 10;
+ repeated Consumer add_consumers = 12;
// Remove consumers (by its names)
- repeated string drop_consumers = 11;
+ repeated string drop_consumers = 13;
// Alter consumers
- repeated AlterConsumer alter_consumers = 12;
+ repeated AlterConsumer alter_consumers = 14;
}
+// Update topic response sent from server to client.
+message AlterTopicResponse {
+ // Result of request will be inside operation.
+ Ydb.Operations.Operation operation = 1;
+}
+
+// Update topic result message that will be inside UpdateTopicResponse.operation.
+message AlterTopicResult {
+}
-// Describe topic request sent from client to server.
-message DescribeTopicRequest {
- Ydb.Operations.OperationParams operation_params = 1;
+// Drop topic request sent from client to server.
+message DropTopicRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
// Topic path.
string path = 2;
}
-// Describe topic response sent from server to client. If topic is not existed then response status will be "SCHEME_ERROR".
-message DescribeTopicResponse {
+// Drop topic response sent from server to client.
+// If topic not exists then response status will be "SCHEME_ERROR".
+message DropTopicResponse {
// Result of request will be inside operation.
Ydb.Operations.Operation operation = 1;
}
-// Describe topic result message that will be inside DescribeTopicResponse.operation.
-message DescribeTopicResult {
- // Description of scheme object.
- Ydb.Scheme.Entry self = 1;
- // Settings of topic.
- TopicSettings settings = 2;
+// Drop topic result message that will be inside DropTopicResponse.operation.
+message DropTopicResult {
}