1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
syntax = "proto3";
import "google/protobuf/descriptor.proto";
package NUnifiedAgentProto;
option java_package = "com.yandex.unified_agent";
option go_package = "a.yandex-team.ru/library/cpp/unified_agent_client/proto;unifiedagent";
extend google.protobuf.FileOptions {
bool GenerateYaStyle = 66777;
}
message Request {
message SessionMetaItem {
string name = 1;
string value = 2;
}
message Initialize {
// Session_id provided by server, use it in case of reconnects.
string session_id = 1;
// Session metadata
repeated SessionMetaItem meta = 2;
string shared_secret_key = 3;
}
message MessageMetaItem {
// Arbitrary key-value pairs. Can be used by agent filters to modify/filter messages
// or to route them to target outputs.
// Meta items of all messages should be grouped by meta key, it's expected in the 'key' field.
// Meta values should be passed in the 'value' sequence, it corresponds to the payload
// sequence from DataBatch. If some messages don't have a meta with this key, the range of such messages
// can be passed via skip_start/skip_length sequences.
// For example, [{m:v1}, {}, {}, {m: v2}, {}, {m: v3}, {}, {}] can be represented as follows:
// key: 'm'
// value: ['v1', 'v2', 'v3']
// skip_start: [1, 4]
// skip_length: [2, 1]
string key = 1;
repeated string value = 2;
repeated uint32 skip_start = 3;
repeated uint32 skip_length = 4;
}
message DataBatch {
repeated uint64 seq_no = 1;
repeated uint64 timestamp = 2; //microseconds
repeated bytes payload = 100;
repeated MessageMetaItem meta = 101;
}
oneof request {
Initialize initialize = 1;
DataBatch data_batch = 2;
}
}
message Response {
message Initialized {
// Session identifier for log and deduplication purposes.
string session_id = 1;
// Application can skip all formed messages by seq_no upto last_seq_no - they are consumed by server.
uint64 last_seq_no = 2;
}
message Ack {
uint64 seq_no = 1;
}
oneof response {
Initialized initialized = 1;
Ack ack = 2;
}
}
service UnifiedAgentService {
rpc Session(stream Request) returns (stream Response);
}
// dataflow:
// Request.initialize -> UnifiedAgent;
// specify session_id when this is a retry. Сlient can already have sesison_id from previous init response,
// or it can use some pregenerated sessionId for each session.
// Response.initializeded -> client;
// Request.entry -> UnifiedAgent;
// ....
// Response.ack -> client;
// when this record is consumed by UnifiedAgent with choosen garanties UnifiedAgent will send ack to client;
// client can forget about this log record now
//
// grpc finish session -> client;
// something went wrong; client must reconnect and retry all not acknowleged records
//
// Exactly once retries - when reconnect, client must provide previous session_id and same seq_no`s
// for records - only in this case UnifiedAgent can dedup.
|