1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
|
syntax = "proto3";
import "google/protobuf/descriptor.proto";
import "ydb/public/api/protos/draft/persqueue_common.proto";
package NPersQueue;
option java_package = "com.yandex.ydb.persqueue";
option cc_enable_arenas = true;
extend google.protobuf.FileOptions {
bool GenerateYaStyle = 66677;
}
message Path {
// Path of object (topic/consumer).
string path = 1;
}
// WRITE REQUEST
message KeyValue {
string key = 1;
string value = 2;
}
message MapType {
repeated KeyValue items = 1;
}
/**
* Request for write session. Contains one of :
* Init - consists of initialization info - Topic, SourceId and so on
* Data - data to be writen
* DataBatch - batch of data to be written
*/
message WriteRequest {
message Init {
string topic = 1;
bytes source_id = 2;
MapType extra_fields = 7; //server and file inside here
uint64 proxy_cookie = 8; //cookie provided by ChooseProxy request //change to bytes
uint32 partition_group = 12; //Group to write to - 0 means any;
string version = 999; //must be filled by client lib
}
message Data {
uint64 seq_no = 1;
bytes data = 2;
uint64 create_time_ms = 3; //timestamp in ms
NPersQueueCommon.ECodec codec = 4;
uint32 uncompressed_size = 5;
}
message DataBatch {
repeated Data data = 1;
}
oneof request {
//init must be sent as first message
Init init = 1;
Data data = 2;
DataBatch data_batch = 3;
}
NPersQueueCommon.Credentials credentials = 20;
}
/**
* Response for write session. Contains one of :
* Error - in any error state - grpc errors, session dies, incorrect Init request and so on
* Init - contains SessionId of created session, MaxSeqNo and Partition
* Ack - acknowlegment of storing corresponding message
* AckBatch - acknowlegment of storing corresponding message batch
*/
message WriteResponse {
message Init {
uint64 max_seq_no = 1;
string session_id = 2;
uint32 partition = 3;
string topic = 4;
}
message Stat {
uint32 write_time_ms = 1;
uint32 total_time_in_partition_queue_ms = 2;
uint32 partition_quoted_time_ms = 3;
uint32 topic_quoted_time_ms = 4;
}
message Ack {
uint64 seq_no = 1;
uint64 offset = 2;
bool already_written = 3;
Stat stat = 4; //not filled in batch case
}
message AckBatch {
Stat stat = 2; //common statistics for batch storing
repeated Ack ack = 1;
}
oneof response {
Init init = 1;
Ack ack = 2;
AckBatch ack_batch = 4;
NPersQueueCommon.Error error = 3;
}
}
// READ REQUEST
/**
* Request for read session. Contains one of :
* Init - contains of Topics to be readed, ClientId and other metadata
* Read - request for read batch. Contains of restrictments for result - MaxSize, MaxCount and so on
* Commit - request for commit some read batches. Contains corresponding cookies
* Locked - comfirming to server that client is ready to get data from partition from concreet offset
*/
message ReadRequest {
enum EProtocolVersion {
Base = 0; // Base protocol version
Batching = 1; // Client supports more effective batching structs (TBatchedData instead of TData)
ReadParamsInInit = 2; // Client sets read params in Init request
}
message Init {
repeated string topics = 1;
bool read_only_local = 2; // ReadOnlyLocal=false - read mirrored topics from other clusters too; will be renamed to read_only_original
string client_id = 4;
bool clientside_locks_allowed = 5; //if true then partitions Lock signal will be sent from server,
//and reads from partitions will began only after Locked signal recieved by server from client
uint64 proxy_cookie = 6; //cookie provided by ChooseProxy request
bool balance_partition_right_now = 8; //if set then do not wait for commits from client on data from partition in case of balancing
repeated uint32 partition_groups = 9; //Groups to be read - if empty then read from all of them
uint32 idle_timeout_sec = 10; //TODO: do we need it?
uint32 commit_interval_ms = 12; // How often server must commit data. If client sends commits faster,
// then server will hold them in order to archive corresponding rate; zero means server default = 1sec
// Read request params
uint32 max_read_messages_count = 14; // Max messages to give to client in one read request
uint32 max_read_size = 15; // Max size in bytes to give to client in one read request
uint32 max_read_partitions_count = 16; // 0 means not matters // Maximum partitions count to give to client in one read request
uint32 max_time_lag_ms = 17; // Read data only with time lag less than or equal to specified
uint64 read_timestamp_ms = 18; // Read data only after this timestamp
bool commits_disabled = 19; // Client will never commit
string version = 999; //must be filled by client lib
// Protocol version to let server know about new features that client supports
uint32 protocol_version = 13; // version must be integer (not enum) because client may be newer than server
}
message Read {
// It is not allowed to change these parameters.
// They will be removed in future from TRead structure.
uint32 max_count = 1;
uint32 max_size = 2;
uint32 partitions_at_once = 3; //0 means not matters
uint32 max_time_lag_ms = 5;
uint64 read_timestamp_ms = 6; //read data only after this timestamp
}
message StartRead {
string topic = 1;
uint32 partition = 2;
uint64 read_offset = 3; //skip upto this position; if committed position is bigger, then do nothing
bool verify_read_offset = 4; //if true then check that committed position is <= ReadOffset; otherwise it means error in client logic
uint64 generation = 5;
uint64 commit_offset = 6; //all messages BEFORE this position are processed by client
}
message Commit {
repeated uint64 cookie = 1;
}
message Status {
uint64 generation = 1;
string topic = 2;
uint32 partition = 3;
}
oneof request {
//init must be sent as first message
Init init = 1;
Read read = 2;
StartRead start_read = 3;
Commit commit = 4;
Status status = 5;
}
NPersQueueCommon.Credentials credentials = 20;
}
message MessageMeta {
bytes source_id = 1;
uint64 seq_no = 2;
uint64 create_time_ms = 3;
uint64 write_time_ms = 4;
MapType extra_fields = 7;
NPersQueueCommon.ECodec codec = 8;
string ip = 9;
uint32 uncompressed_size = 10;
}
/**
* Response for read session. Contains one of :
* Error - in any error state - grpc errors, session dies, incorrect Init request and so on
* Init - contains SessionId of created session
* Data - result of read, contains of messages batch and cookie
* Commit - acknowlegment for commit
* Lock - informs client that server is ready to read data from corresponding partition
* Release - informs client that server will not get data from this partition in future read results, unless other Lock-Locked conversation will be done
*/
message ReadResponse {
message Init {
string session_id = 2; //for debug only
}
message Data {
message Message {
MessageMeta meta = 1; //SeqNo ...
bytes data = 2;
//unique value for clientside deduplication - Topic:Partition:Offset
uint64 offset = 3;
bytes broken_packed_data = 4; // TODO: move to pqlib
}
message MessageBatch {
string topic = 1;
uint32 partition = 2;
repeated Message message = 3;
}
repeated MessageBatch message_batch = 1;
uint64 cookie = 2; //Cookie to be committed by server
}
message BatchedData {
message MessageData {
NPersQueueCommon.ECodec codec = 2;
uint64 offset = 3; //unique value for clientside deduplication - Topic:Partition:Offset
uint64 seq_no = 4;
uint64 create_time_ms = 5;
uint64 uncompressed_size = 6;
bytes data = 1;
}
message Batch {
bytes source_id = 2;
MapType extra_fields = 3;
uint64 write_time_ms = 4;
string ip = 5;
repeated MessageData message_data = 1;
}
message PartitionData {
string topic = 2;
uint32 partition = 3;
repeated Batch batch = 1;
}
uint64 cookie = 2; //Cookie to be committed by server
repeated PartitionData partition_data = 1; //not greater than one PartitionData for each partition
}
message Lock {
string topic = 1;
uint32 partition = 2;
uint64 read_offset = 3; //offset to read from
uint64 end_offset = 4; //know till this time end offset
uint64 generation = 5;
}
message Release {
string topic = 1;
uint32 partition = 2;
bool can_commit = 3; //if CanCommit=false then you can not store progress of processing data for that partition at server;
//all commits will have no effect for this partition
//if you rely on committing offsets then just drop all data for this partition without processing - another session will get them later
//if CanCommit=true and you are relying on committing offsets - you can process all data for this partition you got,
//commit cookies and be sure that no other session will ever get this data
uint64 generation = 4;
}
message Commit {
repeated uint64 cookie = 1; //for debug purposes only
}
// Response for status requst.
message PartitionStatus {
uint64 generation = 1;
string topic = 2;
uint32 partition = 3;
uint64 committed_offset = 4;
uint64 end_offset = 5;
uint64 write_watermark_ms = 6;
}
oneof response {
Init init = 1;
Data data = 2;
BatchedData batched_data = 7;
NPersQueueCommon.Error error = 3;
Lock lock = 4;
Release release = 5;
Commit commit = 6;
PartitionStatus partition_status = 8;
}
}
|