summaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorandybg <[email protected]>2026-04-24 12:15:16 +0300
committerandybg <[email protected]>2026-04-24 12:55:23 +0300
commitcd465393cf4f26e84ccc5554532cfbde88500f9a (patch)
treead9105142758f7b4bedaf6320cf96619c084d883 /library/cpp
parentf4706fc669888a8de9cc73356b9e15da1ce8d64a (diff)
gRPC inactivity watchdog only for negotiated protocol v1+
Unified Agent: новый протокол стриминга и изменения в клиенте Документ опирается на `library/cpp/unified_agent_client/proto/unified_agent.proto` и реализацию `TClientSession` в `client_impl.cpp`. --- ## 1. Новый протокол **Legacy** — сессия без согласования версии: в ответе `Initialized` поле `protocol_version` отсутствует или равно **0**. Поведение соответствует старому контракту до появления опциональных полей согласования. **Версионированный режим (v1 и выше)** — клиент объявляет максимально поддерживаемую версию (`accept_protocol_version`), агент возвращает **согласованную** версию `protocol_version` и опционально **opaque**-токен привязки сессии. Дальнейшие реконнекты с тем же `session_id` требуют передачи **доказательства** привязки (`session_binding_proof`), чтобы сервер мог отличить легитимное продолжение от подмены. Семантика потока данных не меняется: после `Initialized` идут `DataBatch` / `Ack`, ретраи по `seq_no` и дедупликация на стороне агента при совпадении `session_id` и повторной отправке записей. --- ### 1.2. Новые поля **Запрос `Request.Initialize`** | Поле | Тип | Назначение | |------|-----|------------| | `accept_protocol_version` | `optional uint32` | Верхняя граница версии, которую клиент готов использовать (стиль HTTP Accept). **Не задано или 0** — клиент не предлагает новый протокол (legacy-only). | | `session_binding_proof` | `bytes` | Доказательство привязки при реконнекте; используется, когда согласована версия **> 0** и у клиента есть токен от предыдущего `Initialized`. | Поля `session_id`, `meta`, `shared_secret_key` — как раньше; `session_id` при реконнекте передаётся для дедупликации. **Ответ `Response.Initialized`** | Поле | Тип | Назначение | |------|-----|------------| | `protocol_version` | `optional uint32` | Согласованная версия: **min**(accept клиента, максимум сервера). **Не задано или 0** — сессия считается **legacy**. | | `session_binding_token` | `bytes` | Непрозрачный токен: клиент обязан вернуть его в следующем `Initialize` как `session_binding_proof` при переподключении (для версий протокола **≥ 1**). | Остальное без изменений: `session_id`, `last_seq_no`. --- ### 1.3. Процесс и логика выбора версии 1. Клиент задаёт **`MaxAcceptProtocolVersion`** (в коде: `SetMaxAcceptProtocolVersion`). Значение **0** означает: поле `accept_protocol_version` в `Initialize` **не отправляется** — только legacy. 2. Если `MaxAcceptProtocolVersion > 0`, в первом (и последующих) сообщении `Initialize` выставляется `accept_protocol_version = MaxAcceptProtocolVersion`. 3. Агент вычисляет согласованную версию как **min**(`accept_protocol_version`, собственный потолок поддерживаемых версий) и возвращает её в `Initialized.protocol_version`. Если клиент не прислал accept (legacy-only) или сервер отвечает **0** / не задаёт поле — на клиенте фиксируется **legacy** (`NegotiatedProtocol` сбрасывается). 4. При успешном `Initialized` с **`protocol_version > 0`** клиент сохраняет число версии в `NegotiatedProtocol` и копирует `session_binding_token` во внутреннее состояние для следующих коннектов. #### Диаграмма обмена (Mermaid sequence) Первое подключение с согласованием версии и привязкой: ```mermaid sequenceDiagram participant C as Клиент (TClientSession) participant G as gRPC stream participant A as Unified Agent C->>G: открыть Session(stream Request / stream Response) C->>A: Request.initialize<br/>accept_protocol_version = N (если N>0)<br/>session_id (опц.)<br/>meta, shared_secret_key, … Note over A: min(N, server_max) → negotiated A->>C: Response.initialized<br/>session_id<br/>last_seq_no<br/>protocol_version = negotiated<br/>session_binding_token (opaque) C->>C: сохранить NegotiatedProtocol,<br/>SessionBindingToken, SessionId loop Данные C->>A: Request.data_batch (seq_no, payload, …) A->>C: Response.ack (seq_no) end Note over C,A: обрыв стрима → реконнект ``` Реконнект при согласованной версии **> 0**: ```mermaid sequenceDiagram participant C as Клиент participant A as Unified Agent C->>A: Request.initialize<br/>session_id = сохранённый<br/>accept_protocol_version = N<br/>session_binding_proof = прежний token<br/>… A->>C: Response.initialized<br/>session_id, last_seq_no,<br/>protocol_version, session_binding_token (может обновиться) C->>A: Request.data_batch … ``` --- ### 1.4. Восстановление сессии и обмен ключами - **Идентификатор сессии:** `session_id` приходит от сервера в `Initialized` и дальше передаётся в `Initialize` при реконнекте — основа для дедупликации и продолжения с теми же `seq_no`. - **Shared secret:** по-прежнему `shared_secret_key` в `Initialize` (если задан в параметрах клиента) — отдельный канал авторизации/проверки, не смешивается с биндингом стрима. - **Привязка сессии (protocol v1+):** после первого успешного `Initialized` с `protocol_version > 0` клиент хранит `session_binding_token`. При следующем `PrepareInitializeRequest`, если есть `NegotiatedProtocol > 0` и непустой токен, в запрос кладётся `session_binding_proof` (содержимое токена). Так сервер связывает новый gRPC-стрим с прежней логической сессией. - **Конфликт сессии:** при завершении gRPC-вызова с кодом **`ALREADY_EXISTS`** клиент **сбрасывает** `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы следующий коннект не повторял конфликтующий идентификатор и не слал устаревшее proof (см. `OnGrpcCallFinished` в `client_impl.cpp`). --- ### 1.5. Совместимость клиентов и серверов | Клиент | Сервер | Результат | |--------|--------|-----------| | `MaxAcceptProtocolVersion = 0` | любой | Поле `accept_protocol_version` не отправляется; ожидается legacy-ответ (`protocol_version` 0 / отсутствует). Биндинг по токену не используется. | | `MaxAcceptProtocolVersion > 0` | только legacy | Обычно `protocol_version` в ответе 0 или отсутствует — клиент остаётся в legacy для этой сессии. | | `MaxAcceptProtocolVersion > 0` | поддерживает v1+ | Согласуется конкретное число (например 1); включаются `session_binding_token` / `session_binding_proof` на реконнектах. | | Новый клиент | старый агент | Безопасный откат: нет обязательных новых полей в wire-format для legacy; сервер игнорирует неизвестные optional-поля (proto3). | Важно: поведение «только новый протокол» для отдельных механизмов (например принудительная отмена стрима по неактивности) в клиенте завязано на **фактически согласованную** версию **`NegotiatedProtocol > 0`**, а не только на настройку `MaxAcceptProtocolVersion`. --- ## 2. Изменения в клиенте — исправления и защита от регрессий Ниже — логика, связанная с новым протоколом и устойчивостью сессий (файл `client_impl.cpp`, заголовки `client.h` / `client_impl.h`). 1. **Согласование версии и биндинг** — `PrepareInitializeRequest` выставляет `accept_protocol_version` только при `MaxAcceptProtocolVersion > 0`; при наличии согласованной версии и токена добавляет `session_binding_proof`. `OnGrpcCallInitialized` выставляет `NegotiatedProtocol` и `SessionBindingToken` только если `protocol_version` задан и **> 0**, иначе очищает их (строгий legacy). 2. **Конфликт `ALREADY_EXISTS`** — при таком статусе завершения стрима сбрасываются `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы не зациклиться на неверной паре (session_id, proof) и не провоцировать повторные конфликты на стороне агента. 3. **Watchdog неактивности gRPC (`GrpcCallInactivityTimeout`)** — принудительное закрытие активного вызова (`BeginClose(true)`) и счётчик `GrpcCallsClosedByInactivity` выполняются **только** при `NegotiatedProtocol.Defined() && *NegotiatedProtocol > 0`. Для legacy и до первого успешного `Initialized` с ненулевой версией отмена по этому таймеру **не** выполняется; таймер перепланируется как раньше. Это устраняет нежелательное принудительное реконнект-поведение на транспортах, где ранее допускалось «молчание» без отмены (см. план по этому пути). 4. **Пост-fork дочерний процесс** — сброс `SessionId`, `NegotiatedProtocol`, `SessionBindingToken` вместе с очередями, чтобы дочерний процесс не унаследовал привязку чужой сессии. Документация в публичном API: комментарий к `SetGrpcCallInactivityTimeout` в `client.h` описывает ограничение по согласованной версии протокола. --- *При необходимости уточнения формулы `min(accept, server_max)` на стороне агента смотрите реализацию сервера в репозитории `logbroker/unified_agent` (обработка `Initialize` в gRPC-сессии).* commit_hash:9d5ef1cdc0faf793b4f56bfd2bafa362d7995ac5
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/unified_agent_client/client.h11
-rw-r--r--library/cpp/unified_agent_client/client_impl.cpp80
-rw-r--r--library/cpp/unified_agent_client/client_impl.h9
-rw-r--r--library/cpp/unified_agent_client/proto/unified_agent.proto17
4 files changed, 97 insertions, 20 deletions
diff --git a/library/cpp/unified_agent_client/client.h b/library/cpp/unified_agent_client/client.h
index 3828b1d3638..bc5372d217e 100644
--- a/library/cpp/unified_agent_client/client.h
+++ b/library/cpp/unified_agent_client/client.h
@@ -68,6 +68,8 @@ namespace NUnifiedAgent {
// Force-cancel and recreate active grpc stream if there are inflight messages,
// but no grpc callback activity for too long. This is a fail-safe for transport stalls.
+ // Only applies after a successful session init with negotiated protocol_version > 0;
+ // legacy sessions (no negotiated version) are not force-cancelled on inactivity.
//
// Default: 30 sec (use 0 to disable)
TClientParameters& SetGrpcCallInactivityTimeout(TDuration timeout) {
@@ -90,6 +92,13 @@ namespace NUnifiedAgent {
return *this;
}
+ // Max protocol version offered to the agent (HTTP Accept-style). 0 = legacy only (do not send accept_protocol_version).
+ // Default: DefaultMaxAcceptProtocolVersion (v1).
+ TClientParameters& SetMaxAcceptProtocolVersion(ui32 version) {
+ MaxAcceptProtocolVersion = version;
+ return *this;
+ }
+
// Client library sends messages to grpc in batches, this parameter
// establishes upper limit on the size of single batch in bytes.
// If you increase this value, don't forget to adjust max_receive_message_size (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6661788#L185)
@@ -129,6 +138,7 @@ namespace NUnifiedAgent {
static const size_t DefaultGrpcMaxMessageSize;
static const TDuration DefaultGrpcSendDelay;
static const TDuration DefaultGrpcCallInactivityTimeout;
+ static const ui32 DefaultMaxAcceptProtocolVersion;
public:
TString Uri;
@@ -142,6 +152,7 @@ namespace NUnifiedAgent {
bool EnableForkSupport;
size_t GrpcMaxMessageSize;
TIntrusivePtr<TClientCounters> Counters;
+ ui32 MaxAcceptProtocolVersion;
};
struct TSessionParameters {
diff --git a/library/cpp/unified_agent_client/client_impl.cpp b/library/cpp/unified_agent_client/client_impl.cpp
index 2eb38840e08..db7f3459267 100644
--- a/library/cpp/unified_agent_client/client_impl.cpp
+++ b/library/cpp/unified_agent_client/client_impl.cpp
@@ -400,6 +400,8 @@ namespace NUnifiedAgent::NPrivate {
Started = false;
SessionId.Clear();
+ NegotiatedProtocol.Clear();
+ SessionBindingToken.clear();
TrimmedCount = 0;
NextIndex = 0;
AckSeqNo.Clear();
@@ -515,14 +517,15 @@ namespace NUnifiedAgent::NPrivate {
if (timeout == TDuration::Zero()) {
return;
}
- if (ActiveGrpcCall &&
+ if (NegotiatedProtocol.Defined() && *NegotiatedProtocol > 0 &&
+ ActiveGrpcCall &&
!CloseStarted &&
Counters->InflightMessages.Val() > 0 &&
(Now() - TInstant::MicroSeconds(LastGrpcCallActivityUsec.load())) >= timeout)
{
- YLOG_ERR(Sprintf(
- "grpc call inactivity timeout reached [%s], cancelling active call for reconnect",
- timeout.ToString().c_str()));
+ YLOG_ERROR_T(
+ "grpc call inactivity timeout reached [{}], cancelling active call for reconnect",
+ timeout.ToString());
++Counters->GrpcCallsClosedByInactivity;
ActiveGrpcCall->BeginClose(true);
TouchGrpcCallActivity();
@@ -543,8 +546,8 @@ namespace NUnifiedAgent::NPrivate {
++Counters->ReceivedMessages;
Counters->ReceivedBytes += messageSize;
if (messageSize > Client->GetParameters().GrpcMaxMessageSize) {
- YLOG_ERR(Sprintf("message size [%zu] is greater than max grpc message size [%zu], message dropped",
- messageSize, Client->GetParameters().GrpcMaxMessageSize));
+ YLOG_ERROR_T("message size [{}] is greater than max grpc message size [{}], message dropped",
+ messageSize, Client->GetParameters().GrpcMaxMessageSize);
++Counters->DroppedMessages;
Counters->DroppedBytes += messageSize;
++Counters->ErrorsCount;
@@ -571,7 +574,7 @@ namespace NUnifiedAgent::NPrivate {
if (CloseRequested) {
g.Release();
- YLOG_ERR(Sprintf("session is closing, message dropped, [%zu] bytes", messageSize));
+ YLOG_ERROR_T("session is closing, message dropped, [{}] bytes", messageSize);
--Counters->InflightMessages;
Counters->InflightBytes -= messageSize;
++Counters->DroppedMessages;
@@ -740,9 +743,16 @@ namespace NUnifiedAgent::NPrivate {
void TClientSession::PrepareInitializeRequest(NUnifiedAgentProto::Request& target) {
auto& initializeMessage = *target.MutableInitialize();
+ const ui32 maxAccept = Client->GetParameters().MaxAcceptProtocolVersion;
+ if (maxAccept > 0) {
+ initializeMessage.set_accept_protocol_version(maxAccept);
+ }
if (SessionId.Defined()) {
initializeMessage.SetSessionId(*SessionId);
}
+ if (NegotiatedProtocol.Defined() && *NegotiatedProtocol > 0 && !SessionBindingToken.empty()) {
+ initializeMessage.set_session_binding_proof(SessionBindingToken);
+ }
if (Client->GetParameters().SharedSecretKey.Defined()) {
initializeMessage.SetSharedSecretKey(*Client->GetParameters().SharedSecretKey);
}
@@ -869,8 +879,10 @@ namespace NUnifiedAgent::NPrivate {
const auto addResult = requestBuilder.TryAddMessage(queueItem, *AckSeqNo + i + 1);
const size_t serializedLimitToLog = AgentMaxReceiveMessage.Defined() ? *AgentMaxReceiveMessage : 0;
if (addResult.LimitExceeded && target.GetDataBatch().SeqNoSize() == 0) {
- YLOG_ERR(Sprintf("single serialized message is too large [%zu] > [%zu], dropping it",
- addResult.NewSerializedRequestSize, serializedLimitToLog));
+ YLOG_ERROR_T(
+ "single serialized message is too large [{}] > [{}], dropping it",
+ addResult.NewSerializedRequestSize,
+ serializedLimitToLog);
queueItem.Skipped = true;
++Counters->DroppedMessages;
Counters->DroppedBytes += queueItem.Size;
@@ -944,9 +956,17 @@ namespace NUnifiedAgent::NPrivate {
YLOG_DEBUG(Sprintf("ack [%" PRIu64 "], [%zu] messages, [%zu] bytes", seqNo, messagesCount, bytesCount));
}
- void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo) {
+ void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo,
+ const TFMaybe<ui32>& protocolVersion, TString bindingToken) {
TouchGrpcCallActivity();
SessionId = sessionId;
+ if (protocolVersion.Defined() && *protocolVersion > 0) {
+ NegotiatedProtocol = *protocolVersion;
+ SessionBindingToken = std::move(bindingToken);
+ } else {
+ NegotiatedProtocol.Clear();
+ SessionBindingToken.clear();
+ }
Acknowledge(lastSeqNo);
NextIndex = TrimmedCount;
++Counters->GrpcCallsInitialized;
@@ -954,20 +974,31 @@ namespace NUnifiedAgent::NPrivate {
Counters->GrpcInflightBytes -= GrpcInflightBytes;
GrpcInflightMessages = 0;
GrpcInflightBytes = 0;
- YLOG_INFO(Sprintf("grpc call initialized, session_id [%s], last_seq_no [%" PRIu64 "]",
- sessionId.c_str(), lastSeqNo));
+ YLOG_INFO_F(
+ "grpc call initialized, session_id [{}], last_seq_no [{}], protocol_version [{}]",
+ sessionId,
+ lastSeqNo,
+ protocolVersion.Defined() ? ToString(*protocolVersion) : TString("legacy"));
}
- void TClientSession::OnGrpcCallFinished() {
+ void TClientSession::OnGrpcCallFinished(const grpc::Status& finishStatus) {
Y_ABORT_UNLESS(!Closed);
Y_ABORT_UNLESS(ActiveGrpcCall);
ActiveGrpcCall = nullptr;
+ if (!finishStatus.ok() && finishStatus.error_code() == grpc::StatusCode::ALREADY_EXISTS) {
+ SessionId.Clear();
+ NegotiatedProtocol.Clear();
+ SessionBindingToken.clear();
+ YLOG_WARNING_T(
+ "grpc session conflict (ALREADY_EXISTS), cleared session id, message [{}]",
+ TString{finishStatus.error_message()});
+ }
if (CloseStarted && (ForcedCloseStarted || WriteQueue.empty())) {
DoClose();
} else {
const auto reconnectTime = TInstant::Now() + Client->GetParameters().GrpcReconnectDelay;
MakeGrpcCallTimer->Set(reconnectTime);
- YLOG_INFO(Sprintf("grpc call delayed until [%s]", reconnectTime.ToString().c_str()));
+ YLOG_INFO_T("grpc call delayed until [{}]", reconnectTime.ToString());
}
}
@@ -1174,7 +1205,7 @@ namespace NUnifiedAgent::NPrivate {
Session.TouchGrpcCallActivity();
ReadPending = false;
if (FinishDone) {
- Session.OnGrpcCallFinished();
+ Session.OnGrpcCallFinished(FinishStatus);
return;
}
if (!ErrorOccured && status == EIOStatus::Error && WritesBlocked) {
@@ -1210,8 +1241,19 @@ namespace NUnifiedAgent::NPrivate {
static_cast<int>(Response.response_case())));
return;
}
- Session.OnGrpcCallInitialized(Response.GetInitialized().GetSessionId(),
- Response.GetInitialized().GetLastSeqNo());
+ {
+ const auto& init = Response.GetInitialized();
+ TFMaybe<ui32> protocolVersion;
+ if (init.has_protocol_version()) {
+ protocolVersion = init.protocol_version();
+ }
+ TString bindingToken;
+ if (!init.session_binding_token().empty()) {
+ bindingToken.assign(init.session_binding_token().data(), init.session_binding_token().size());
+ }
+ Session.OnGrpcCallInitialized(init.GetSessionId(), init.GetLastSeqNo(), protocolVersion,
+ std::move(bindingToken));
+ }
Initialized_ = true;
if (!WritePending) {
ScheduleWrite();
@@ -1254,7 +1296,7 @@ namespace NUnifiedAgent::NPrivate {
++Session.GetCounters().ErrorsCount;
}
if (!ReadPending) {
- Session.OnGrpcCallFinished();
+ Session.OnGrpcCallFinished(finishStatus);
}
}
@@ -1314,6 +1356,7 @@ namespace NUnifiedAgent {
, EnableForkSupport(false)
, GrpcMaxMessageSize(DefaultGrpcMaxMessageSize)
, Counters(nullptr)
+ , MaxAcceptProtocolVersion(DefaultMaxAcceptProtocolVersion)
{
}
@@ -1329,6 +1372,7 @@ namespace NUnifiedAgent {
const size_t TClientParameters::DefaultGrpcMaxMessageSize = 1_MB;
const TDuration TClientParameters::DefaultGrpcSendDelay = TDuration::MilliSeconds(10);
const TDuration TClientParameters::DefaultGrpcCallInactivityTimeout = TDuration::Seconds(30);
+ const ui32 TClientParameters::DefaultMaxAcceptProtocolVersion = 1;
TClientPtr MakeClient(const TClientParameters& parameters) {
diff --git a/library/cpp/unified_agent_client/client_impl.h b/library/cpp/unified_agent_client/client_impl.h
index 1f1cb052d3a..75c475876a3 100644
--- a/library/cpp/unified_agent_client/client_impl.h
+++ b/library/cpp/unified_agent_client/client_impl.h
@@ -9,6 +9,8 @@
#include <library/cpp/unified_agent_client/proto/unified_agent.grpc.pb.h>
#include <library/cpp/unified_agent_client/grpc_io.h>
+#include <contrib/libs/grpc/include/grpcpp/support/status.h>
+
#include <library/cpp/logger/global/global.h>
#include <util/generic/deque.h>
@@ -144,9 +146,10 @@ namespace NUnifiedAgent::NPrivate {
void Acknowledge(ui64 seqNo);
- void OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo);
+ void OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo, const TFMaybe<ui32>& protocolVersion,
+ TString bindingToken);
- void OnGrpcCallFinished();
+ void OnGrpcCallFinished(const grpc::Status& finishStatus);
NThreading::TFuture<void> PreFork();
@@ -255,6 +258,8 @@ namespace NUnifiedAgent::NPrivate {
TIntrusivePtr<TClient> Client;
TFMaybe<TString> OriginalSessionId;
TFMaybe<TString> SessionId;
+ TFMaybe<ui32> NegotiatedProtocol;
+ TString SessionBindingToken;
TFMaybe<THashMap<TString, TString>> Meta;
TScopeLogger Logger;
bool CloseStarted;
diff --git a/library/cpp/unified_agent_client/proto/unified_agent.proto b/library/cpp/unified_agent_client/proto/unified_agent.proto
index 68efe357476..f13cf372e0c 100644
--- a/library/cpp/unified_agent_client/proto/unified_agent.proto
+++ b/library/cpp/unified_agent_client/proto/unified_agent.proto
@@ -24,6 +24,12 @@ message Request {
repeated SessionMetaItem meta = 2;
string shared_secret_key = 3;
+
+ // Max protocol version the client supports (HTTP Accept-style). Unset or 0 = legacy only.
+ optional uint32 accept_protocol_version = 4;
+
+ // Proof of session binding for reconnect; used when negotiated protocol_version > 0.
+ bytes session_binding_proof = 5;
}
message MessageMetaItem {
@@ -66,6 +72,12 @@ message Response {
// Application can skip all formed messages by seq_no upto last_seq_no - they are consumed by server.
uint64 last_seq_no = 2;
+
+ // Negotiated protocol version (min(accept, server_max)). Unset or 0 = legacy session.
+ optional uint32 protocol_version = 3;
+
+ // Opaque token; client must send it back as session_binding_proof on reconnect (protocol v1+).
+ bytes session_binding_token = 4;
}
message Ack {
@@ -99,3 +111,8 @@ service UnifiedAgentService {
//
// Exactly once retries - when reconnect, client must provide previous session_id and same seq_no`s
// for records - only in this case UnifiedAgent can dedup.
+//
+// Protocol negotiation (optional fields):
+// - Initialize.accept_protocol_version: max version the client supports (0 / unset = legacy only).
+// - Initialized.protocol_version: negotiated min(accept, server_max); unset/0 = legacy session.
+// - Initialized.session_binding_token + Initialize.session_binding_proof: v1+ session binding (opaque).