<feed xmlns='http://www.w3.org/2005/Atom'>
<title>ydb/library/cpp/unified_agent_client/client_impl.h, branch CLI_2.32.0</title>
<subtitle>Mirror of YDB github repos</subtitle>
<id>https://code.mastervirt.ru/ydb/atom?h=CLI_2.32.0</id>
<link rel='self' href='https://code.mastervirt.ru/ydb/atom?h=CLI_2.32.0'/>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/'/>
<updated>2026-04-24T09:55:23Z</updated>
<entry>
<title>gRPC inactivity watchdog only for negotiated protocol v1+</title>
<updated>2026-04-24T09:55:23Z</updated>
<author>
<name>andybg</name>
<email>andybg@yandex-team.com</email>
</author>
<published>2026-04-24T09:15:16Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=cd465393cf4f26e84ccc5554532cfbde88500f9a'/>
<id>urn:sha1:cd465393cf4f26e84ccc5554532cfbde88500f9a</id>
<content type='text'>
Unified Agent: новый протокол стриминга и изменения в клиенте

Документ опирается на `library/cpp/unified_agent_client/proto/unified_agent.proto` и реализацию `TClientSession` в `client_impl.cpp`.

---

## 1. Новый протокол

**Legacy** — сессия без согласования версии: в ответе `Initialized` поле `protocol_version` отсутствует или равно **0**. Поведение соответствует старому контракту до появления опциональных полей согласования.

**Версионированный режим (v1 и выше)** — клиент объявляет максимально поддерживаемую версию (`accept_protocol_version`), агент возвращает **согласованную** версию `protocol_version` и опционально **opaque**-токен привязки сессии. Дальнейшие реконнекты с тем же `session_id` требуют передачи **доказательства** привязки (`session_binding_proof`), чтобы сервер мог отличить легитимное продолжение от подмены.

Семантика потока данных не меняется: после `Initialized` идут `DataBatch` / `Ack`, ретраи по `seq_no` и дедупликация на стороне агента при совпадении `session_id` и повторной отправке записей.

---

### 1.2. Новые поля

**Запрос `Request.Initialize`**

| Поле | Тип | Назначение |
|------|-----|------------|
| `accept_protocol_version` | `optional uint32` | Верхняя граница версии, которую клиент готов использовать (стиль HTTP Accept). **Не задано или 0** — клиент не предлагает новый протокол (legacy-only). |
| `session_binding_proof` | `bytes` | Доказательство привязки при реконнекте; используется, когда согласована версия **&gt; 0** и у клиента есть токен от предыдущего `Initialized`. |

Поля `session_id`, `meta`, `shared_secret_key` — как раньше; `session_id` при реконнекте передаётся для дедупликации.

**Ответ `Response.Initialized`**

| Поле | Тип | Назначение |
|------|-----|------------|
| `protocol_version` | `optional uint32` | Согласованная версия: **min**(accept клиента, максимум сервера). **Не задано или 0** — сессия считается **legacy**. |
| `session_binding_token` | `bytes` | Непрозрачный токен: клиент обязан вернуть его в следующем `Initialize` как `session_binding_proof` при переподключении (для версий протокола **≥ 1**). |

Остальное без изменений: `session_id`, `last_seq_no`.

---

### 1.3. Процесс и логика выбора версии

1. Клиент задаёт **`MaxAcceptProtocolVersion`** (в коде: `SetMaxAcceptProtocolVersion`). Значение **0** означает: поле `accept_protocol_version` в `Initialize` **не отправляется** — только legacy.
2. Если `MaxAcceptProtocolVersion &gt; 0`, в первом (и последующих) сообщении `Initialize` выставляется `accept_protocol_version = MaxAcceptProtocolVersion`.
3. Агент вычисляет согласованную версию как **min**(`accept_protocol_version`, собственный потолок поддерживаемых версий) и возвращает её в `Initialized.protocol_version`. Если клиент не прислал accept (legacy-only) или сервер отвечает **0** / не задаёт поле — на клиенте фиксируется **legacy** (`NegotiatedProtocol` сбрасывается).
4. При успешном `Initialized` с **`protocol_version &gt; 0`** клиент сохраняет число версии в `NegotiatedProtocol` и копирует `session_binding_token` во внутреннее состояние для следующих коннектов.

#### Диаграмма обмена (Mermaid sequence)

Первое подключение с согласованием версии и привязкой:

```mermaid
sequenceDiagram
    participant C as Клиент (TClientSession)
    participant G as gRPC stream
    participant A as Unified Agent

    C-&gt;&gt;G: открыть Session(stream Request / stream Response)
    C-&gt;&gt;A: Request.initialize&lt;br/&gt;accept_protocol_version = N (если N&gt;0)&lt;br/&gt;session_id (опц.)&lt;br/&gt;meta, shared_secret_key, …

    Note over A: min(N, server_max) → negotiated

    A-&gt;&gt;C: Response.initialized&lt;br/&gt;session_id&lt;br/&gt;last_seq_no&lt;br/&gt;protocol_version = negotiated&lt;br/&gt;session_binding_token (opaque)

    C-&gt;&gt;C: сохранить NegotiatedProtocol,&lt;br/&gt;SessionBindingToken, SessionId

    loop Данные
        C-&gt;&gt;A: Request.data_batch (seq_no, payload, …)
        A-&gt;&gt;C: Response.ack (seq_no)
    end

    Note over C,A: обрыв стрима → реконнект
```

Реконнект при согласованной версии **&gt; 0**:

```mermaid
sequenceDiagram
    participant C as Клиент
    participant A as Unified Agent

    C-&gt;&gt;A: Request.initialize&lt;br/&gt;session_id = сохранённый&lt;br/&gt;accept_protocol_version = N&lt;br/&gt;session_binding_proof = прежний token&lt;br/&gt;…

    A-&gt;&gt;C: Response.initialized&lt;br/&gt;session_id, last_seq_no,&lt;br/&gt;protocol_version, session_binding_token (может обновиться)

    C-&gt;&gt;A: Request.data_batch …
```

---

### 1.4. Восстановление сессии и обмен ключами

- **Идентификатор сессии:** `session_id` приходит от сервера в `Initialized` и дальше передаётся в `Initialize` при реконнекте — основа для дедупликации и продолжения с теми же `seq_no`.
- **Shared secret:** по-прежнему `shared_secret_key` в `Initialize` (если задан в параметрах клиента) — отдельный канал авторизации/проверки, не смешивается с биндингом стрима.
- **Привязка сессии (protocol v1+):** после первого успешного `Initialized` с `protocol_version &gt; 0` клиент хранит `session_binding_token`. При следующем `PrepareInitializeRequest`, если есть `NegotiatedProtocol &gt; 0` и непустой токен, в запрос кладётся `session_binding_proof` (содержимое токена). Так сервер связывает новый gRPC-стрим с прежней логической сессией.
- **Конфликт сессии:** при завершении gRPC-вызова с кодом **`ALREADY_EXISTS`** клиент **сбрасывает** `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы следующий коннект не повторял конфликтующий идентификатор и не слал устаревшее proof (см. `OnGrpcCallFinished` в `client_impl.cpp`).

---

### 1.5. Совместимость клиентов и серверов

| Клиент | Сервер | Результат |
|--------|--------|-----------|
| `MaxAcceptProtocolVersion = 0` | любой | Поле `accept_protocol_version` не отправляется; ожидается legacy-ответ (`protocol_version` 0 / отсутствует). Биндинг по токену не используется. |
| `MaxAcceptProtocolVersion &gt; 0` | только legacy | Обычно `protocol_version` в ответе 0 или отсутствует — клиент остаётся в legacy для этой сессии. |
| `MaxAcceptProtocolVersion &gt; 0` | поддерживает v1+ | Согласуется конкретное число (например 1); включаются `session_binding_token` / `session_binding_proof` на реконнектах. |
| Новый клиент | старый агент | Безопасный откат: нет обязательных новых полей в wire-format для legacy; сервер игнорирует неизвестные optional-поля (proto3). |

Важно: поведение «только новый протокол» для отдельных механизмов (например принудительная отмена стрима по неактивности) в клиенте завязано на **фактически согласованную** версию **`NegotiatedProtocol &gt; 0`**, а не только на настройку `MaxAcceptProtocolVersion`.

---

## 2. Изменения в клиенте — исправления и защита от регрессий

Ниже — логика, связанная с новым протоколом и устойчивостью сессий (файл `client_impl.cpp`, заголовки `client.h` / `client_impl.h`).

1. **Согласование версии и биндинг** — `PrepareInitializeRequest` выставляет `accept_protocol_version` только при `MaxAcceptProtocolVersion &gt; 0`; при наличии согласованной версии и токена добавляет `session_binding_proof`. `OnGrpcCallInitialized` выставляет `NegotiatedProtocol` и `SessionBindingToken` только если `protocol_version` задан и **&gt; 0**, иначе очищает их (строгий legacy).

2. **Конфликт `ALREADY_EXISTS`** — при таком статусе завершения стрима сбрасываются `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы не зациклиться на неверной паре (session_id, proof) и не провоцировать повторные конфликты на стороне агента.

3. **Watchdog неактивности gRPC (`GrpcCallInactivityTimeout`)** — принудительное закрытие активного вызова (`BeginClose(true)`) и счётчик `GrpcCallsClosedByInactivity` выполняются **только** при `NegotiatedProtocol.Defined() &amp;&amp; *NegotiatedProtocol &gt; 0`. Для legacy и до первого успешного `Initialized` с ненулевой версией отмена по этому таймеру **не** выполняется; таймер перепланируется как раньше. Это устраняет нежелательное принудительное реконнект-поведение на транспортах, где ранее допускалось «молчание» без отмены (см. план по этому пути).

4. **Пост-fork дочерний процесс** — сброс `SessionId`, `NegotiatedProtocol`, `SessionBindingToken` вместе с очередями, чтобы дочерний процесс не унаследовал привязку чужой сессии.

Документация в публичном API: комментарий к `SetGrpcCallInactivityTimeout` в `client.h` описывает ограничение по согласованной версии протокола.

---

*При необходимости уточнения формулы `min(accept, server_max)` на стороне агента смотрите реализацию сервера в репозитории `logbroker/unified_agent` (обработка `Initialize` в gRPC-сессии).*
commit_hash:9d5ef1cdc0faf793b4f56bfd2bafa362d7995ac5
</content>
</entry>
<entry>
<title>C++ client reconnect after agent SIGKILL/long outage</title>
<updated>2026-04-10T10:30:47Z</updated>
<author>
<name>andybg</name>
<email>andybg@yandex-team.com</email>
</author>
<published>2026-04-10T10:11:09Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=e65fc92c9d31ea9ffa49c59fea807c0f0b2c92fd'/>
<id>urn:sha1:e65fc92c9d31ea9ffa49c59fea807c0f0b2c92fd</id>
<content type='text'>
## Проблема

После аварийного завершения агента (SIGKILL) или длительной недоступности C++ клиент Unified Agent не восстанавливал соединение сам. Сессия оставалась в состоянии «активна», сообщения накапливались в буфере (inflight), при достижении лимита начинались потери (dropped messages), и доставка не возобновлялась даже после поднятия агента — требовался перезапуск приложения-клиента.

## Как исправлено

Добавлен механизм «страховки»: если у сессии есть не подтверждённые сообщения (inflight), но по активному gRPC-стриму долго нет никакой активности (ни read, ни write, ни finish), клиент принудительно отменяет текущий вызов и запускает обычный путь реконнекта (повторные попытки установки сессии). «Длительность молчания» задаётся новым параметром; по умолчанию механизм включён (30 с) — зависание сессии никому не нужно, отключить можно явно, выставив таймаут в 0.

{% cut "Технические детали" %}

**Корневая причина:** реконнект планировался только в `OnGrpcCallFinished`, то есть после завершения активного stream-вызова. При «зависшем» транспорте (transport stall после падения агента) gRPC мог не вызывать финальный callback, `ActiveGrpcCall` оставался занятым, и новый `MakeGrpcCall` не запускался.

**Изменения в коде:**
- В `TClientParameters` добавлен параметр `GrpcCallInactivityTimeout` (по умолчанию 30 с; 0 = отключено).
- В сессии заведён периодический watchdog-таймер; при срабатывании проверяется: есть ли активный `TGrpcCall`, есть ли inflight-сообщения и не было ли активности дольше заданного таймаута. При выполнении условий вызывается принудительное закрытие текущего call (`BeginClose(true)`), что приводит к `OnGrpcCallFinished` и планированию следующего `MakeGrpcCall` по `GrpcReconnectDelay`.
- Время последней активности обновляется при любом успешном событии стрима (Accept, Read, Write, Finish, а также при инициализации сессии и при ack).

**Тестирование:** добавлен интеграционный тест `TestReconnectAfterLongStall` (агент поднимается → сессия и первая доставка → убийство агента → отправка во время даунтайма → пауза → повторный запуск агента → проверка, что inflight обнуляется и сессия переинициализируется без рестарта приложения). Регрессии по существующим reconnect-тестам не наблюдаются.

{% endcut %}
commit_hash:f55efec2cff20fa975500e73d60ee57654abb2e0
</content>
</entry>
<entry>
<title>Add throttling for max inflight error logs</title>
<updated>2026-01-14T22:51:38Z</updated>
<author>
<name>andybg</name>
<email>andybg@yandex-team.com</email>
</author>
<published>2026-01-14T22:34:18Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=743db03e460759247cc83b5c9f44cba60416b240'/>
<id>urn:sha1:743db03e460759247cc83b5c9f44cba60416b240</id>
<content type='text'>
## Problem
Log spam with repeated ``'max inflight of [***] bytes reached, [***] bytes dropped'`` errors appearing multiple times within the same second, causing log flooding.

## Solution
Implemented one-second throttling mechanism for this specific error message to prevent log spam while maintaining accurate counter tracking.

## Testing
- Added `TestMaxInflightBytesThrottling` integration test
- Confirms throttling limits log entries to ≤3 for 50 rapid message drops (vs 50 without throttling)
- Ensures all dropped message counters remain accurate regardless of throttling
commit_hash:58f44ca8ce2b8b416586f8ca7a3d3ca971f1e9cb
</content>
</entry>
<entry>
<title>feat grpc: update to grpc 1.53.1</title>
<updated>2023-07-17T16:35:29Z</updated>
<author>
<name>leonidlazarev</name>
<email>leonidlazarev@yandex-team.com</email>
</author>
<published>2023-07-17T16:35:29Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=cb8e9a6330e4e5d9a0e2f8506e7469bbd641ec63'/>
<id>urn:sha1:cb8e9a6330e4e5d9a0e2f8506e7469bbd641ec63</id>
<content type='text'>
update grpc to 1.53.1
update grpcio/py3 to 1.53.1

Added patches:
    22-grpc-code-output.patch - allow translation of grpc code to internal string type.
    23-max-thread-limitation.patch - to provide interface for settings of thread number limit, as
    grpc::DynamicThreadPool doesn't provide interface to limit thread number anymore.
    24-support_for-non-abort-grpc.patch - generate exception instead of application crash
    25-forkable-destruction-order.patch - correct forkable logic for TimerManager
    27-skip-child-post-fork-operations.patch - allow to skip child post fork operations to exclude UB (used for unified agent only)
    pr33495_fox_nested_fork.patch - fix issues with nested forks
    pr33582_fork_handler.patch - disable fork handler support  if it is not requested intentionally</content>
</entry>
<entry>
<title>Log backend move</title>
<updated>2023-02-09T09:40:11Z</updated>
<author>
<name>hor911</name>
<email>hor911@ydb.tech</email>
</author>
<published>2023-02-09T09:40:11Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=24689527cd888aa8a640ecb5077e656b3520d373'/>
<id>urn:sha1:24689527cd888aa8a640ecb5077e656b3520d373</id>
<content type='text'>
</content>
</entry>
</feed>
