diff options
| author | andybg <[email protected]> | 2026-04-24 12:15:16 +0300 |
|---|---|---|
| committer | andybg <[email protected]> | 2026-04-24 12:55:23 +0300 |
| commit | cd465393cf4f26e84ccc5554532cfbde88500f9a (patch) | |
| tree | ad9105142758f7b4bedaf6320cf96619c084d883 /yt/cpp/mapreduce/client/client_reader.cpp | |
| parent | f4706fc669888a8de9cc73356b9e15da1ce8d64a (diff) | |
gRPC inactivity watchdog only for negotiated protocol v1+
Unified Agent: новый протокол стриминга и изменения в клиенте
Документ опирается на `library/cpp/unified_agent_client/proto/unified_agent.proto` и реализацию `TClientSession` в `client_impl.cpp`.
---
## 1. Новый протокол
**Legacy** — сессия без согласования версии: в ответе `Initialized` поле `protocol_version` отсутствует или равно **0**. Поведение соответствует старому контракту до появления опциональных полей согласования.
**Версионированный режим (v1 и выше)** — клиент объявляет максимально поддерживаемую версию (`accept_protocol_version`), агент возвращает **согласованную** версию `protocol_version` и опционально **opaque**-токен привязки сессии. Дальнейшие реконнекты с тем же `session_id` требуют передачи **доказательства** привязки (`session_binding_proof`), чтобы сервер мог отличить легитимное продолжение от подмены.
Семантика потока данных не меняется: после `Initialized` идут `DataBatch` / `Ack`, ретраи по `seq_no` и дедупликация на стороне агента при совпадении `session_id` и повторной отправке записей.
---
### 1.2. Новые поля
**Запрос `Request.Initialize`**
| Поле | Тип | Назначение |
|------|-----|------------|
| `accept_protocol_version` | `optional uint32` | Верхняя граница версии, которую клиент готов использовать (стиль HTTP Accept). **Не задано или 0** — клиент не предлагает новый протокол (legacy-only). |
| `session_binding_proof` | `bytes` | Доказательство привязки при реконнекте; используется, когда согласована версия **> 0** и у клиента есть токен от предыдущего `Initialized`. |
Поля `session_id`, `meta`, `shared_secret_key` — как раньше; `session_id` при реконнекте передаётся для дедупликации.
**Ответ `Response.Initialized`**
| Поле | Тип | Назначение |
|------|-----|------------|
| `protocol_version` | `optional uint32` | Согласованная версия: **min**(accept клиента, максимум сервера). **Не задано или 0** — сессия считается **legacy**. |
| `session_binding_token` | `bytes` | Непрозрачный токен: клиент обязан вернуть его в следующем `Initialize` как `session_binding_proof` при переподключении (для версий протокола **≥ 1**). |
Остальное без изменений: `session_id`, `last_seq_no`.
---
### 1.3. Процесс и логика выбора версии
1. Клиент задаёт **`MaxAcceptProtocolVersion`** (в коде: `SetMaxAcceptProtocolVersion`). Значение **0** означает: поле `accept_protocol_version` в `Initialize` **не отправляется** — только legacy.
2. Если `MaxAcceptProtocolVersion > 0`, в первом (и последующих) сообщении `Initialize` выставляется `accept_protocol_version = MaxAcceptProtocolVersion`.
3. Агент вычисляет согласованную версию как **min**(`accept_protocol_version`, собственный потолок поддерживаемых версий) и возвращает её в `Initialized.protocol_version`. Если клиент не прислал accept (legacy-only) или сервер отвечает **0** / не задаёт поле — на клиенте фиксируется **legacy** (`NegotiatedProtocol` сбрасывается).
4. При успешном `Initialized` с **`protocol_version > 0`** клиент сохраняет число версии в `NegotiatedProtocol` и копирует `session_binding_token` во внутреннее состояние для следующих коннектов.
#### Диаграмма обмена (Mermaid sequence)
Первое подключение с согласованием версии и привязкой:
```mermaid
sequenceDiagram
participant C as Клиент (TClientSession)
participant G as gRPC stream
participant A as Unified Agent
C->>G: открыть Session(stream Request / stream Response)
C->>A: Request.initialize<br/>accept_protocol_version = N (если N>0)<br/>session_id (опц.)<br/>meta, shared_secret_key, …
Note over A: min(N, server_max) → negotiated
A->>C: Response.initialized<br/>session_id<br/>last_seq_no<br/>protocol_version = negotiated<br/>session_binding_token (opaque)
C->>C: сохранить NegotiatedProtocol,<br/>SessionBindingToken, SessionId
loop Данные
C->>A: Request.data_batch (seq_no, payload, …)
A->>C: Response.ack (seq_no)
end
Note over C,A: обрыв стрима → реконнект
```
Реконнект при согласованной версии **> 0**:
```mermaid
sequenceDiagram
participant C as Клиент
participant A as Unified Agent
C->>A: Request.initialize<br/>session_id = сохранённый<br/>accept_protocol_version = N<br/>session_binding_proof = прежний token<br/>…
A->>C: Response.initialized<br/>session_id, last_seq_no,<br/>protocol_version, session_binding_token (может обновиться)
C->>A: Request.data_batch …
```
---
### 1.4. Восстановление сессии и обмен ключами
- **Идентификатор сессии:** `session_id` приходит от сервера в `Initialized` и дальше передаётся в `Initialize` при реконнекте — основа для дедупликации и продолжения с теми же `seq_no`.
- **Shared secret:** по-прежнему `shared_secret_key` в `Initialize` (если задан в параметрах клиента) — отдельный канал авторизации/проверки, не смешивается с биндингом стрима.
- **Привязка сессии (protocol v1+):** после первого успешного `Initialized` с `protocol_version > 0` клиент хранит `session_binding_token`. При следующем `PrepareInitializeRequest`, если есть `NegotiatedProtocol > 0` и непустой токен, в запрос кладётся `session_binding_proof` (содержимое токена). Так сервер связывает новый gRPC-стрим с прежней логической сессией.
- **Конфликт сессии:** при завершении gRPC-вызова с кодом **`ALREADY_EXISTS`** клиент **сбрасывает** `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы следующий коннект не повторял конфликтующий идентификатор и не слал устаревшее proof (см. `OnGrpcCallFinished` в `client_impl.cpp`).
---
### 1.5. Совместимость клиентов и серверов
| Клиент | Сервер | Результат |
|--------|--------|-----------|
| `MaxAcceptProtocolVersion = 0` | любой | Поле `accept_protocol_version` не отправляется; ожидается legacy-ответ (`protocol_version` 0 / отсутствует). Биндинг по токену не используется. |
| `MaxAcceptProtocolVersion > 0` | только legacy | Обычно `protocol_version` в ответе 0 или отсутствует — клиент остаётся в legacy для этой сессии. |
| `MaxAcceptProtocolVersion > 0` | поддерживает v1+ | Согласуется конкретное число (например 1); включаются `session_binding_token` / `session_binding_proof` на реконнектах. |
| Новый клиент | старый агент | Безопасный откат: нет обязательных новых полей в wire-format для legacy; сервер игнорирует неизвестные optional-поля (proto3). |
Важно: поведение «только новый протокол» для отдельных механизмов (например принудительная отмена стрима по неактивности) в клиенте завязано на **фактически согласованную** версию **`NegotiatedProtocol > 0`**, а не только на настройку `MaxAcceptProtocolVersion`.
---
## 2. Изменения в клиенте — исправления и защита от регрессий
Ниже — логика, связанная с новым протоколом и устойчивостью сессий (файл `client_impl.cpp`, заголовки `client.h` / `client_impl.h`).
1. **Согласование версии и биндинг** — `PrepareInitializeRequest` выставляет `accept_protocol_version` только при `MaxAcceptProtocolVersion > 0`; при наличии согласованной версии и токена добавляет `session_binding_proof`. `OnGrpcCallInitialized` выставляет `NegotiatedProtocol` и `SessionBindingToken` только если `protocol_version` задан и **> 0**, иначе очищает их (строгий legacy).
2. **Конфликт `ALREADY_EXISTS`** — при таком статусе завершения стрима сбрасываются `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы не зациклиться на неверной паре (session_id, proof) и не провоцировать повторные конфликты на стороне агента.
3. **Watchdog неактивности gRPC (`GrpcCallInactivityTimeout`)** — принудительное закрытие активного вызова (`BeginClose(true)`) и счётчик `GrpcCallsClosedByInactivity` выполняются **только** при `NegotiatedProtocol.Defined() && *NegotiatedProtocol > 0`. Для legacy и до первого успешного `Initialized` с ненулевой версией отмена по этому таймеру **не** выполняется; таймер перепланируется как раньше. Это устраняет нежелательное принудительное реконнект-поведение на транспортах, где ранее допускалось «молчание» без отмены (см. план по этому пути).
4. **Пост-fork дочерний процесс** — сброс `SessionId`, `NegotiatedProtocol`, `SessionBindingToken` вместе с очередями, чтобы дочерний процесс не унаследовал привязку чужой сессии.
Документация в публичном API: комментарий к `SetGrpcCallInactivityTimeout` в `client.h` описывает ограничение по согласованной версии протокола.
---
*При необходимости уточнения формулы `min(accept, server_max)` на стороне агента смотрите реализацию сервера в репозитории `logbroker/unified_agent` (обработка `Initialize` в gRPC-сессии).*
commit_hash:9d5ef1cdc0faf793b4f56bfd2bafa362d7995ac5
Diffstat (limited to 'yt/cpp/mapreduce/client/client_reader.cpp')
0 files changed, 0 insertions, 0 deletions
