summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/client_reader.cpp
diff options
context:
space:
mode:
authorandybg <[email protected]>2026-04-24 12:15:16 +0300
committerandybg <[email protected]>2026-04-24 12:55:23 +0300
commitcd465393cf4f26e84ccc5554532cfbde88500f9a (patch)
treead9105142758f7b4bedaf6320cf96619c084d883 /yt/cpp/mapreduce/client/client_reader.cpp
parentf4706fc669888a8de9cc73356b9e15da1ce8d64a (diff)
gRPC inactivity watchdog only for negotiated protocol v1+
Unified Agent: новый протокол стриминга и изменения в клиенте Документ опирается на `library/cpp/unified_agent_client/proto/unified_agent.proto` и реализацию `TClientSession` в `client_impl.cpp`. --- ## 1. Новый протокол **Legacy** — сессия без согласования версии: в ответе `Initialized` поле `protocol_version` отсутствует или равно **0**. Поведение соответствует старому контракту до появления опциональных полей согласования. **Версионированный режим (v1 и выше)** — клиент объявляет максимально поддерживаемую версию (`accept_protocol_version`), агент возвращает **согласованную** версию `protocol_version` и опционально **opaque**-токен привязки сессии. Дальнейшие реконнекты с тем же `session_id` требуют передачи **доказательства** привязки (`session_binding_proof`), чтобы сервер мог отличить легитимное продолжение от подмены. Семантика потока данных не меняется: после `Initialized` идут `DataBatch` / `Ack`, ретраи по `seq_no` и дедупликация на стороне агента при совпадении `session_id` и повторной отправке записей. --- ### 1.2. Новые поля **Запрос `Request.Initialize`** | Поле | Тип | Назначение | |------|-----|------------| | `accept_protocol_version` | `optional uint32` | Верхняя граница версии, которую клиент готов использовать (стиль HTTP Accept). **Не задано или 0** — клиент не предлагает новый протокол (legacy-only). | | `session_binding_proof` | `bytes` | Доказательство привязки при реконнекте; используется, когда согласована версия **> 0** и у клиента есть токен от предыдущего `Initialized`. | Поля `session_id`, `meta`, `shared_secret_key` — как раньше; `session_id` при реконнекте передаётся для дедупликации. **Ответ `Response.Initialized`** | Поле | Тип | Назначение | |------|-----|------------| | `protocol_version` | `optional uint32` | Согласованная версия: **min**(accept клиента, максимум сервера). **Не задано или 0** — сессия считается **legacy**. | | `session_binding_token` | `bytes` | Непрозрачный токен: клиент обязан вернуть его в следующем `Initialize` как `session_binding_proof` при переподключении (для версий протокола **≥ 1**). | Остальное без изменений: `session_id`, `last_seq_no`. --- ### 1.3. Процесс и логика выбора версии 1. Клиент задаёт **`MaxAcceptProtocolVersion`** (в коде: `SetMaxAcceptProtocolVersion`). Значение **0** означает: поле `accept_protocol_version` в `Initialize` **не отправляется** — только legacy. 2. Если `MaxAcceptProtocolVersion > 0`, в первом (и последующих) сообщении `Initialize` выставляется `accept_protocol_version = MaxAcceptProtocolVersion`. 3. Агент вычисляет согласованную версию как **min**(`accept_protocol_version`, собственный потолок поддерживаемых версий) и возвращает её в `Initialized.protocol_version`. Если клиент не прислал accept (legacy-only) или сервер отвечает **0** / не задаёт поле — на клиенте фиксируется **legacy** (`NegotiatedProtocol` сбрасывается). 4. При успешном `Initialized` с **`protocol_version > 0`** клиент сохраняет число версии в `NegotiatedProtocol` и копирует `session_binding_token` во внутреннее состояние для следующих коннектов. #### Диаграмма обмена (Mermaid sequence) Первое подключение с согласованием версии и привязкой: ```mermaid sequenceDiagram participant C as Клиент (TClientSession) participant G as gRPC stream participant A as Unified Agent C->>G: открыть Session(stream Request / stream Response) C->>A: Request.initialize<br/>accept_protocol_version = N (если N>0)<br/>session_id (опц.)<br/>meta, shared_secret_key, … Note over A: min(N, server_max) → negotiated A->>C: Response.initialized<br/>session_id<br/>last_seq_no<br/>protocol_version = negotiated<br/>session_binding_token (opaque) C->>C: сохранить NegotiatedProtocol,<br/>SessionBindingToken, SessionId loop Данные C->>A: Request.data_batch (seq_no, payload, …) A->>C: Response.ack (seq_no) end Note over C,A: обрыв стрима → реконнект ``` Реконнект при согласованной версии **> 0**: ```mermaid sequenceDiagram participant C as Клиент participant A as Unified Agent C->>A: Request.initialize<br/>session_id = сохранённый<br/>accept_protocol_version = N<br/>session_binding_proof = прежний token<br/>… A->>C: Response.initialized<br/>session_id, last_seq_no,<br/>protocol_version, session_binding_token (может обновиться) C->>A: Request.data_batch … ``` --- ### 1.4. Восстановление сессии и обмен ключами - **Идентификатор сессии:** `session_id` приходит от сервера в `Initialized` и дальше передаётся в `Initialize` при реконнекте — основа для дедупликации и продолжения с теми же `seq_no`. - **Shared secret:** по-прежнему `shared_secret_key` в `Initialize` (если задан в параметрах клиента) — отдельный канал авторизации/проверки, не смешивается с биндингом стрима. - **Привязка сессии (protocol v1+):** после первого успешного `Initialized` с `protocol_version > 0` клиент хранит `session_binding_token`. При следующем `PrepareInitializeRequest`, если есть `NegotiatedProtocol > 0` и непустой токен, в запрос кладётся `session_binding_proof` (содержимое токена). Так сервер связывает новый gRPC-стрим с прежней логической сессией. - **Конфликт сессии:** при завершении gRPC-вызова с кодом **`ALREADY_EXISTS`** клиент **сбрасывает** `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы следующий коннект не повторял конфликтующий идентификатор и не слал устаревшее proof (см. `OnGrpcCallFinished` в `client_impl.cpp`). --- ### 1.5. Совместимость клиентов и серверов | Клиент | Сервер | Результат | |--------|--------|-----------| | `MaxAcceptProtocolVersion = 0` | любой | Поле `accept_protocol_version` не отправляется; ожидается legacy-ответ (`protocol_version` 0 / отсутствует). Биндинг по токену не используется. | | `MaxAcceptProtocolVersion > 0` | только legacy | Обычно `protocol_version` в ответе 0 или отсутствует — клиент остаётся в legacy для этой сессии. | | `MaxAcceptProtocolVersion > 0` | поддерживает v1+ | Согласуется конкретное число (например 1); включаются `session_binding_token` / `session_binding_proof` на реконнектах. | | Новый клиент | старый агент | Безопасный откат: нет обязательных новых полей в wire-format для legacy; сервер игнорирует неизвестные optional-поля (proto3). | Важно: поведение «только новый протокол» для отдельных механизмов (например принудительная отмена стрима по неактивности) в клиенте завязано на **фактически согласованную** версию **`NegotiatedProtocol > 0`**, а не только на настройку `MaxAcceptProtocolVersion`. --- ## 2. Изменения в клиенте — исправления и защита от регрессий Ниже — логика, связанная с новым протоколом и устойчивостью сессий (файл `client_impl.cpp`, заголовки `client.h` / `client_impl.h`). 1. **Согласование версии и биндинг** — `PrepareInitializeRequest` выставляет `accept_protocol_version` только при `MaxAcceptProtocolVersion > 0`; при наличии согласованной версии и токена добавляет `session_binding_proof`. `OnGrpcCallInitialized` выставляет `NegotiatedProtocol` и `SessionBindingToken` только если `protocol_version` задан и **> 0**, иначе очищает их (строгий legacy). 2. **Конфликт `ALREADY_EXISTS`** — при таком статусе завершения стрима сбрасываются `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы не зациклиться на неверной паре (session_id, proof) и не провоцировать повторные конфликты на стороне агента. 3. **Watchdog неактивности gRPC (`GrpcCallInactivityTimeout`)** — принудительное закрытие активного вызова (`BeginClose(true)`) и счётчик `GrpcCallsClosedByInactivity` выполняются **только** при `NegotiatedProtocol.Defined() && *NegotiatedProtocol > 0`. Для legacy и до первого успешного `Initialized` с ненулевой версией отмена по этому таймеру **не** выполняется; таймер перепланируется как раньше. Это устраняет нежелательное принудительное реконнект-поведение на транспортах, где ранее допускалось «молчание» без отмены (см. план по этому пути). 4. **Пост-fork дочерний процесс** — сброс `SessionId`, `NegotiatedProtocol`, `SessionBindingToken` вместе с очередями, чтобы дочерний процесс не унаследовал привязку чужой сессии. Документация в публичном API: комментарий к `SetGrpcCallInactivityTimeout` в `client.h` описывает ограничение по согласованной версии протокола. --- *При необходимости уточнения формулы `min(accept, server_max)` на стороне агента смотрите реализацию сервера в репозитории `logbroker/unified_agent` (обработка `Initialize` в gRPC-сессии).* commit_hash:9d5ef1cdc0faf793b4f56bfd2bafa362d7995ac5
Diffstat (limited to 'yt/cpp/mapreduce/client/client_reader.cpp')
0 files changed, 0 insertions, 0 deletions