<feed xmlns='http://www.w3.org/2005/Atom'>
<title>ydb/library/cpp/unified_agent_client, branch main</title>
<subtitle>Mirror of YDB github repos</subtitle>
<id>https://code.mastervirt.ru/ydb/atom?h=main</id>
<link rel='self' href='https://code.mastervirt.ru/ydb/atom?h=main'/>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/'/>
<updated>2026-05-18T10:00:07Z</updated>
<entry>
<title>Intermediate changes</title>
<updated>2026-05-18T10:00:07Z</updated>
<author>
<name>robot-piglet</name>
<email>robot-piglet@yandex-team.com</email>
</author>
<published>2026-05-18T09:00:44Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=ec25e7e0cf78d7c1dd10ee1afd538a284ffd9c53'/>
<id>urn:sha1:ec25e7e0cf78d7c1dd10ee1afd538a284ffd9c53</id>
<content type='text'>
commit_hash:7218aca25ba819156cd6a364f9bd4ef8598c49ef
</content>
</entry>
<entry>
<title>gRPC inactivity watchdog only for negotiated protocol v1+</title>
<updated>2026-04-24T09:55:23Z</updated>
<author>
<name>andybg</name>
<email>andybg@yandex-team.com</email>
</author>
<published>2026-04-24T09:15:16Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=cd465393cf4f26e84ccc5554532cfbde88500f9a'/>
<id>urn:sha1:cd465393cf4f26e84ccc5554532cfbde88500f9a</id>
<content type='text'>
Unified Agent: новый протокол стриминга и изменения в клиенте

Документ опирается на `library/cpp/unified_agent_client/proto/unified_agent.proto` и реализацию `TClientSession` в `client_impl.cpp`.

---

## 1. Новый протокол

**Legacy** — сессия без согласования версии: в ответе `Initialized` поле `protocol_version` отсутствует или равно **0**. Поведение соответствует старому контракту до появления опциональных полей согласования.

**Версионированный режим (v1 и выше)** — клиент объявляет максимально поддерживаемую версию (`accept_protocol_version`), агент возвращает **согласованную** версию `protocol_version` и опционально **opaque**-токен привязки сессии. Дальнейшие реконнекты с тем же `session_id` требуют передачи **доказательства** привязки (`session_binding_proof`), чтобы сервер мог отличить легитимное продолжение от подмены.

Семантика потока данных не меняется: после `Initialized` идут `DataBatch` / `Ack`, ретраи по `seq_no` и дедупликация на стороне агента при совпадении `session_id` и повторной отправке записей.

---

### 1.2. Новые поля

**Запрос `Request.Initialize`**

| Поле | Тип | Назначение |
|------|-----|------------|
| `accept_protocol_version` | `optional uint32` | Верхняя граница версии, которую клиент готов использовать (стиль HTTP Accept). **Не задано или 0** — клиент не предлагает новый протокол (legacy-only). |
| `session_binding_proof` | `bytes` | Доказательство привязки при реконнекте; используется, когда согласована версия **&gt; 0** и у клиента есть токен от предыдущего `Initialized`. |

Поля `session_id`, `meta`, `shared_secret_key` — как раньше; `session_id` при реконнекте передаётся для дедупликации.

**Ответ `Response.Initialized`**

| Поле | Тип | Назначение |
|------|-----|------------|
| `protocol_version` | `optional uint32` | Согласованная версия: **min**(accept клиента, максимум сервера). **Не задано или 0** — сессия считается **legacy**. |
| `session_binding_token` | `bytes` | Непрозрачный токен: клиент обязан вернуть его в следующем `Initialize` как `session_binding_proof` при переподключении (для версий протокола **≥ 1**). |

Остальное без изменений: `session_id`, `last_seq_no`.

---

### 1.3. Процесс и логика выбора версии

1. Клиент задаёт **`MaxAcceptProtocolVersion`** (в коде: `SetMaxAcceptProtocolVersion`). Значение **0** означает: поле `accept_protocol_version` в `Initialize` **не отправляется** — только legacy.
2. Если `MaxAcceptProtocolVersion &gt; 0`, в первом (и последующих) сообщении `Initialize` выставляется `accept_protocol_version = MaxAcceptProtocolVersion`.
3. Агент вычисляет согласованную версию как **min**(`accept_protocol_version`, собственный потолок поддерживаемых версий) и возвращает её в `Initialized.protocol_version`. Если клиент не прислал accept (legacy-only) или сервер отвечает **0** / не задаёт поле — на клиенте фиксируется **legacy** (`NegotiatedProtocol` сбрасывается).
4. При успешном `Initialized` с **`protocol_version &gt; 0`** клиент сохраняет число версии в `NegotiatedProtocol` и копирует `session_binding_token` во внутреннее состояние для следующих коннектов.

#### Диаграмма обмена (Mermaid sequence)

Первое подключение с согласованием версии и привязкой:

```mermaid
sequenceDiagram
    participant C as Клиент (TClientSession)
    participant G as gRPC stream
    participant A as Unified Agent

    C-&gt;&gt;G: открыть Session(stream Request / stream Response)
    C-&gt;&gt;A: Request.initialize&lt;br/&gt;accept_protocol_version = N (если N&gt;0)&lt;br/&gt;session_id (опц.)&lt;br/&gt;meta, shared_secret_key, …

    Note over A: min(N, server_max) → negotiated

    A-&gt;&gt;C: Response.initialized&lt;br/&gt;session_id&lt;br/&gt;last_seq_no&lt;br/&gt;protocol_version = negotiated&lt;br/&gt;session_binding_token (opaque)

    C-&gt;&gt;C: сохранить NegotiatedProtocol,&lt;br/&gt;SessionBindingToken, SessionId

    loop Данные
        C-&gt;&gt;A: Request.data_batch (seq_no, payload, …)
        A-&gt;&gt;C: Response.ack (seq_no)
    end

    Note over C,A: обрыв стрима → реконнект
```

Реконнект при согласованной версии **&gt; 0**:

```mermaid
sequenceDiagram
    participant C as Клиент
    participant A as Unified Agent

    C-&gt;&gt;A: Request.initialize&lt;br/&gt;session_id = сохранённый&lt;br/&gt;accept_protocol_version = N&lt;br/&gt;session_binding_proof = прежний token&lt;br/&gt;…

    A-&gt;&gt;C: Response.initialized&lt;br/&gt;session_id, last_seq_no,&lt;br/&gt;protocol_version, session_binding_token (может обновиться)

    C-&gt;&gt;A: Request.data_batch …
```

---

### 1.4. Восстановление сессии и обмен ключами

- **Идентификатор сессии:** `session_id` приходит от сервера в `Initialized` и дальше передаётся в `Initialize` при реконнекте — основа для дедупликации и продолжения с теми же `seq_no`.
- **Shared secret:** по-прежнему `shared_secret_key` в `Initialize` (если задан в параметрах клиента) — отдельный канал авторизации/проверки, не смешивается с биндингом стрима.
- **Привязка сессии (protocol v1+):** после первого успешного `Initialized` с `protocol_version &gt; 0` клиент хранит `session_binding_token`. При следующем `PrepareInitializeRequest`, если есть `NegotiatedProtocol &gt; 0` и непустой токен, в запрос кладётся `session_binding_proof` (содержимое токена). Так сервер связывает новый gRPC-стрим с прежней логической сессией.
- **Конфликт сессии:** при завершении gRPC-вызова с кодом **`ALREADY_EXISTS`** клиент **сбрасывает** `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы следующий коннект не повторял конфликтующий идентификатор и не слал устаревшее proof (см. `OnGrpcCallFinished` в `client_impl.cpp`).

---

### 1.5. Совместимость клиентов и серверов

| Клиент | Сервер | Результат |
|--------|--------|-----------|
| `MaxAcceptProtocolVersion = 0` | любой | Поле `accept_protocol_version` не отправляется; ожидается legacy-ответ (`protocol_version` 0 / отсутствует). Биндинг по токену не используется. |
| `MaxAcceptProtocolVersion &gt; 0` | только legacy | Обычно `protocol_version` в ответе 0 или отсутствует — клиент остаётся в legacy для этой сессии. |
| `MaxAcceptProtocolVersion &gt; 0` | поддерживает v1+ | Согласуется конкретное число (например 1); включаются `session_binding_token` / `session_binding_proof` на реконнектах. |
| Новый клиент | старый агент | Безопасный откат: нет обязательных новых полей в wire-format для legacy; сервер игнорирует неизвестные optional-поля (proto3). |

Важно: поведение «только новый протокол» для отдельных механизмов (например принудительная отмена стрима по неактивности) в клиенте завязано на **фактически согласованную** версию **`NegotiatedProtocol &gt; 0`**, а не только на настройку `MaxAcceptProtocolVersion`.

---

## 2. Изменения в клиенте — исправления и защита от регрессий

Ниже — логика, связанная с новым протоколом и устойчивостью сессий (файл `client_impl.cpp`, заголовки `client.h` / `client_impl.h`).

1. **Согласование версии и биндинг** — `PrepareInitializeRequest` выставляет `accept_protocol_version` только при `MaxAcceptProtocolVersion &gt; 0`; при наличии согласованной версии и токена добавляет `session_binding_proof`. `OnGrpcCallInitialized` выставляет `NegotiatedProtocol` и `SessionBindingToken` только если `protocol_version` задан и **&gt; 0**, иначе очищает их (строгий legacy).

2. **Конфликт `ALREADY_EXISTS`** — при таком статусе завершения стрима сбрасываются `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы не зациклиться на неверной паре (session_id, proof) и не провоцировать повторные конфликты на стороне агента.

3. **Watchdog неактивности gRPC (`GrpcCallInactivityTimeout`)** — принудительное закрытие активного вызова (`BeginClose(true)`) и счётчик `GrpcCallsClosedByInactivity` выполняются **только** при `NegotiatedProtocol.Defined() &amp;&amp; *NegotiatedProtocol &gt; 0`. Для legacy и до первого успешного `Initialized` с ненулевой версией отмена по этому таймеру **не** выполняется; таймер перепланируется как раньше. Это устраняет нежелательное принудительное реконнект-поведение на транспортах, где ранее допускалось «молчание» без отмены (см. план по этому пути).

4. **Пост-fork дочерний процесс** — сброс `SessionId`, `NegotiatedProtocol`, `SessionBindingToken` вместе с очередями, чтобы дочерний процесс не унаследовал привязку чужой сессии.

Документация в публичном API: комментарий к `SetGrpcCallInactivityTimeout` в `client.h` описывает ограничение по согласованной версии протокола.

---

*При необходимости уточнения формулы `min(accept, server_max)` на стороне агента смотрите реализацию сервера в репозитории `logbroker/unified_agent` (обработка `Initialize` в gRPC-сессии).*
commit_hash:9d5ef1cdc0faf793b4f56bfd2bafa362d7995ac5
</content>
</entry>
<entry>
<title>C++ client reconnect after agent SIGKILL/long outage</title>
<updated>2026-04-10T10:30:47Z</updated>
<author>
<name>andybg</name>
<email>andybg@yandex-team.com</email>
</author>
<published>2026-04-10T10:11:09Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=e65fc92c9d31ea9ffa49c59fea807c0f0b2c92fd'/>
<id>urn:sha1:e65fc92c9d31ea9ffa49c59fea807c0f0b2c92fd</id>
<content type='text'>
## Проблема

После аварийного завершения агента (SIGKILL) или длительной недоступности C++ клиент Unified Agent не восстанавливал соединение сам. Сессия оставалась в состоянии «активна», сообщения накапливались в буфере (inflight), при достижении лимита начинались потери (dropped messages), и доставка не возобновлялась даже после поднятия агента — требовался перезапуск приложения-клиента.

## Как исправлено

Добавлен механизм «страховки»: если у сессии есть не подтверждённые сообщения (inflight), но по активному gRPC-стриму долго нет никакой активности (ни read, ни write, ни finish), клиент принудительно отменяет текущий вызов и запускает обычный путь реконнекта (повторные попытки установки сессии). «Длительность молчания» задаётся новым параметром; по умолчанию механизм включён (30 с) — зависание сессии никому не нужно, отключить можно явно, выставив таймаут в 0.

{% cut "Технические детали" %}

**Корневая причина:** реконнект планировался только в `OnGrpcCallFinished`, то есть после завершения активного stream-вызова. При «зависшем» транспорте (transport stall после падения агента) gRPC мог не вызывать финальный callback, `ActiveGrpcCall` оставался занятым, и новый `MakeGrpcCall` не запускался.

**Изменения в коде:**
- В `TClientParameters` добавлен параметр `GrpcCallInactivityTimeout` (по умолчанию 30 с; 0 = отключено).
- В сессии заведён периодический watchdog-таймер; при срабатывании проверяется: есть ли активный `TGrpcCall`, есть ли inflight-сообщения и не было ли активности дольше заданного таймаута. При выполнении условий вызывается принудительное закрытие текущего call (`BeginClose(true)`), что приводит к `OnGrpcCallFinished` и планированию следующего `MakeGrpcCall` по `GrpcReconnectDelay`.
- Время последней активности обновляется при любом успешном событии стрима (Accept, Read, Write, Finish, а также при инициализации сессии и при ack).

**Тестирование:** добавлен интеграционный тест `TestReconnectAfterLongStall` (агент поднимается → сессия и первая доставка → убийство агента → отправка во время даунтайма → пауза → повторный запуск агента → проверка, что inflight обнуляется и сессия переинициализируется без рестарта приложения). Регрессии по существующим reconnect-тестам не наблюдаются.

{% endcut %}
commit_hash:f55efec2cff20fa975500e73d60ee57654abb2e0
</content>
</entry>
<entry>
<title>Drop backward compat with grpc-prev</title>
<updated>2026-03-20T10:18:33Z</updated>
<author>
<name>thegeorg</name>
<email>thegeorg@yandex-team.com</email>
</author>
<published>2026-03-20T09:02:21Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=fc9fe39f913bd803446f53a85054aa45181fc1f0'/>
<id>urn:sha1:fc9fe39f913bd803446f53a85054aa45181fc1f0</id>
<content type='text'>
commit_hash:e5545cade7cc946c943e85c680db7c276edc48b5
</content>
</entry>
<entry>
<title>Fix TAsyncJoiner "already joined" crash</title>
<updated>2026-03-14T02:16:45Z</updated>
<author>
<name>andybg</name>
<email>andybg@yandex-team.com</email>
</author>
<published>2026-03-14T01:44:54Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=f462fa40c277092fe8810533f85712c7ccb02215'/>
<id>urn:sha1:f462fa40c277092fe8810533f85712c7ccb02215</id>
<content type='text'>
## 1. Где и когда это может происходить

Гонка проявляется при **одновременном** завершении задачи (Unregister/Join) и выполнении finish-действия `CommitTimer()` в воркере. Типичные сценарии:

- **Остановка агента** — при shutdown закрываются сессии, по задачам вызывается Unregister(); в это же время воркер может доходить до finish-действий уже «уходящей» задачи.
- **Закрытие сессии/канала** — например, отключение клиента, ошибка — владелец вызывает Unregister() по задаче, в которой использовался таймер.
- **Плагины и задачи, подверженные багу:** любые, что используют `TLocalTimersQueue` (отложенные таймеры через `DelayedExecutor`). В коде UA это, в частности:
  - **http_output** (`plugins/lib/http/http_sender.cpp`) — таймеры для отложенных повторов запросов и flush;
  - **file_input** (`plugins/file_input/file_input.cpp`) — таймеры при работе с файлами;
  - **logbroker_output_new** (`plugins/logbroker_output_new/logbroker_output_impl.h`) — таймер обновления метрик.

Во всех этих случаях задача владеет `TLocalTimersQueue`, при установке таймера в finish-действие попадает `CommitTimer()`, и при быстром Unregister() возможна гонка с Join().

---

{% cut "Зачем потоку воркера вызывать Ref()" %}

Таймер планируется **асинхронно**: `DelayedExecutor.SetTimer(Timer, triggerTime)` регистрирует callback в отдельном потоке (sleeper). Когда время наступит, callback вызовется **уже после** того, как `Run()` задачи завершился. Задача и её контекст (сессия, `TLocalTimersQueue`, `ExecutionJoiner`) должны оставаться валидными до срабатывания или отмены таймера — иначе callback приведёт к use-after-free.

**Ref()** — это «удержание» задачи: пока есть лишний Ref, `Join()` не завершится (Refs не станет 0). То есть: «задача не считается полностью завершённой, пока таймер не сработал или не был сброшен». Когда таймер сбрасывают или очередь финализируют, вызывается **UnRef()** — тогда задача может перейти в joined.

**Где именно вызывается Ref/UnRef:**

Файл `logbroker/unified_agent/common/delayed_executor.cpp`:

```cpp
void TLocalTimersQueue::CommitTimer() {
    if (Queue.GetCount() &gt; 0) {
        const auto triggerTime = Top().Value();
        if (!TimerTriggerTime.Defined() || ...) {
            if (!TimerTriggerTime.Defined()) {
                // ← ЗДЕСЬ: перед первой установкой таймера в DelayedExecutor
                // держим задачу «живой» до срабатывания/сброса таймера
                if (!TTaskExecutor::CurrentTaskOrDie().ExecutionJoiner().TryRef()) {
                    CommitTimerScheduled = false;
                    return;
                }
            }
            DelayedExecutor.SetTimer(Timer, triggerTime);  // асинхронный таймер
            TimerTriggerTime = triggerTime;
        }
    } else if (TimerTriggerTime.Defined()) {
        DelayedExecutor.ResetTimer(Timer);
        TTaskExecutor::CurrentTaskOrDie().ExecutionJoiner().UnRef();  // таймер снят — отдаём Ref
        TimerTriggerTime.Clear();
    }
    ...
}
```

`CommitTimer()` вызывается из **finish-действия** задачи (добавляется в `EnsureCommitTimerScheduled()` → `AddFinishAction([this]() { CommitTimer(); })`), т.е. выполняется в потоке воркера после выхода из `Run()`. Пример использования таймера из кода плагина — `plugins/lib/http/http_sender.cpp`: там `LocalTimerQueue.SetTimer(request-&gt;Timer, triggerTime)` планирует отложенный повтор запроса; callback при срабатывании постит событие в сессию.

{% endcut %}

---

{% cut "Контекст: участники и суть гонки" %}

**Участники:**
- **ExecutionJoiner** (`TAsyncJoiner`) — объект с атомарным счётчиком `Refs` (начальное значение 1). Пока `Refs &gt;= 1`, задачу считают «активной». `Join()` вызывает `UnRef()`; когда `Refs` становится 0, вызывается `Promise.SetValue()` («joined»).
- **Поток задачи (Task/Unregister)** — владелец задачи; вызывает `Unregister()` → `ExecutionJoiner_.Join()` → внутри один раз `UnRef()`.
- **Поток таймера (Worker/Timer)** — воркер пула задач; выполняет finish-действия задачи. Одно из них — `CommitTimer()`, которое при первой установке таймера вызывало `ExecutionJoiner().Ref()`.

**Гонка:** между моментом, когда поток задачи делает `Join()` (и доводит `Refs` до 0), и моментом, когда поток воркера выполняет `CommitTimer()` и вызывает `Ref()`. Если `Ref()` вызывается уже после перехода в «joined», `fetch_add(1)` возвращает 0 и срабатывает `Y_ABORT_UNLESS(result &gt;= 1, "already joined")`.

{% endcut %}

---

{% cut "До фикса: креш при гонке" %}

Поток задачи вызывает `Unregister()` и ждёт `Join()`. Поток воркера после завершения `Run()` выполняет finish-действие `CommitTimer()`. Если к этому моменту `Join()` уже выполнил `UnRef()` и `Refs == 0`, вызов `Ref()` в `CommitTimer()` приводит к падению.

```mermaid
sequenceDiagram
    participant TaskThread as Поток задачи
    participant Joiner as ExecutionJoiner
    participant WorkerThread as Поток воркера

    Note over TaskThread,WorkerThread: Задача с таймером: Run() вызвал SetTimer(), в finish-действия добавлен CommitTimer()

    TaskThread-&gt;&gt;TaskThread: Unregister()
    TaskThread-&gt;&gt;Joiner: Join()
    Joiner-&gt;&gt;Joiner: UnRef() → Refs = 0
    Joiner-&gt;&gt;Joiner: Promise.SetValue() — joined

    WorkerThread-&gt;&gt;WorkerThread: Выполняет finish-действия
    WorkerThread-&gt;&gt;WorkerThread: CommitTimer()
    WorkerThread-&gt;&gt;Joiner: Ref()
    Joiner-&gt;&gt;Joiner: fetch_add(1) → result = 0
    Joiner-&gt;&gt;WorkerThread: Y_ABORT_UNLESS(result &gt;= 1) — CRASH
```

**Итог:** в момент вызова `Ref()` в `CommitTimer()` объект уже в состоянии «joined» (`Refs == 0`), проверка в `Ref()` не выполняется → **SIGABRT**.

{% endcut %}

---

{% cut "После фикса: корректный выход без креша" %}

В `CommitTimer()` вместо `Ref()` вызывается `TryRef()`: атомарно проверяется `Refs &gt;= 1` (через CAS); если уже 0, `TryRef()` возвращает `false` и `CommitTimer()` сразу выходит, не вызывая `Ref()` и не трогая таймер.

```mermaid
sequenceDiagram
    participant TaskThread as Поток задачи
    participant Joiner as ExecutionJoiner
    participant WorkerThread as Поток воркера

    Note over TaskThread,WorkerThread: Та же гонка: Join() и CommitTimer() выполняются почти одновременно

    TaskThread-&gt;&gt;TaskThread: Unregister()
    TaskThread-&gt;&gt;Joiner: Join()
    Joiner-&gt;&gt;Joiner: UnRef() → Refs = 0
    Joiner-&gt;&gt;Joiner: Promise.SetValue() — joined

    WorkerThread-&gt;&gt;WorkerThread: Выполняет finish-действия
    WorkerThread-&gt;&gt;WorkerThread: CommitTimer()
    WorkerThread-&gt;&gt;Joiner: TryRef()
    Joiner-&gt;&gt;Joiner: load(Refs) = 0 → current &lt; 1
    Joiner-&gt;&gt;WorkerThread: return false
    WorkerThread-&gt;&gt;WorkerThread: CommitTimerScheduled = false return
    Note over WorkerThread: Таймер не ставится, креша нет
```

**Итог:** при уже «joined» состоянии `TryRef()` возвращает `false`, `CommitTimer()` завершается без вызова `Ref()` и без падения.

{% endcut %}

---

{% cut "Сводка изменений" %}

| Место | До фикса | После фикса |
|-------|----------|-------------|
| `CommitTimer()` при первой установке таймера | `Ref()` → при Refs=0 креш | `TryRef()` → при `false` ранний выход |
| `TAsyncJoiner` | Только `Ref()` / `UnRef()` | Добавлен `TryRef()` (CAS, при refs&lt;1 возврат false) |
| `Finalize()` | Не сбрасывал активный таймер | При `TimerTriggerTime.Defined()` — `ResetTimer`, `UnRef()`, `Clear()` до `Finalized = true` |

Тест `TestTimerQueueUnregisterNoCrash` (500 итераций: задача с таймером → Pulse → Unregister) без фикса периодически воспроизводит креш; с фиксом — стабильно зелёный.

{% endcut %}
commit_hash:5f57d88fc53f44db31e87deaeca57a7e9ef262ca
</content>
</entry>
<entry>
<title>/0: Implement TExecutorSet and TExecutorSetManager</title>
<updated>2026-02-25T12:06:35Z</updated>
<author>
<name>iofik</name>
<email>iofik@yandex-team.com</email>
</author>
<published>2026-02-25T11:02:41Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=02e0966b2c68a74e2525633c92fa8219e9b8ef4a'/>
<id>urn:sha1:02e0966b2c68a74e2525633c92fa8219e9b8ef4a</id>
<content type='text'>
## Абстракция управления executor’ами (TExecutorSet, TExecutorSetManager)

### Что сделано

Вводится единый слой для работы с пулами потоков agent’а: main, background и IO.

**TExecutorSet** — представление набора executor’ов:
- `Main()`, `Background()`, `IO()` — одинаковый интерфейс для всех трёх пулов
- IO возвращает тот же объект, что и Main, когда `separate_io_thread_pool == 0`, иначе — отдельный executor

**TExecutorSetManager** — создание и владение executor’ами:
- `GetExecutors(EExecutorPool pool)` — возвращает нужный набор; пока используются только `Global` и `Metrics` (оба отдают один и тот же набор)
- `Start()`, `Stop()` — жизненный цикл
- Main создаётся сразу, Background и IO — лениво при первом обращении

**Рефакторинг TAgent**:
- Вместо отдельных полей `TaskExecutor`, `BackgroundTaskExecutor`, `IOTaskExecutor` и `DelayedExecutor` используется `ExecutorSetManager`
- Обращения к executor’ам идут через `ExecutorSetManager-&gt;GetExecutors(EExecutorPool::Global)`
- FSNotifier, MemoryQuotaManager, AgentContext, CountersUpdater работают с глобальными executor’ами

**RegisterCountersWith**:
- Регистрирует счётчики main, background, IO (если включён отдельный IO-пул) и delayed
- Раньше IO-счётчики не учитывались — это исправлено

**Тесты**:
- Добавлен gtest-модуль `executor_set_tests` в `tests/gtests/`:
  - Проверяются GetExecutors, доступность Main, ленивая инициализация Background
  - Поведение IO при `separate_io_thread_pool == 0` и `&gt; 0`
  - Start/Stop и параметр pool в GetExecutors

### Цель

Подготовка к PR1: выделение отдельных пулов для pipelines. Сейчас меняется только абстракция, без изменения конфигурации и без добавления dedicated pools.
commit_hash:0f1b2205c474db1b601a72f063916333a74882d6
</content>
</entry>
<entry>
<title>/3: Get rid of YLOG*_F macros in favor of just YLOG*</title>
<updated>2026-02-20T18:52:08Z</updated>
<author>
<name>iofik</name>
<email>iofik@yandex-team.com</email>
</author>
<published>2026-02-20T18:25:16Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=b972f11a5ffee8ffeac87ac0a4b0625a40e79136'/>
<id>urn:sha1:b972f11a5ffee8ffeac87ac0a4b0625a40e79136</id>
<content type='text'>
_Все вызовы вида YLOG\*\_F(...) заменены на YLOG\*_(...) по всему дереву unified\_agent (например, YLOG\_DEBUG\_F → YLOG\_DEBUG, YLOG\_ERR\_F → YLOG\_ERROR).
commit_hash:483c6f5cb6db2d44e9d71b427a697dc2850cba1f
</content>
</entry>
<entry>
<title>/2: Use our internal logger header, add logging metrics</title>
<updated>2026-02-19T20:07:43Z</updated>
<author>
<name>iofik</name>
<email>iofik@yandex-team.com</email>
</author>
<published>2026-02-19T19:43:26Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=ed109965ec70b4e46ead9312cc5e33c2e561d154'/>
<id>urn:sha1:ed109965ec70b4e46ead9312cc5e33c2e561d154</id>
<content type='text'>
# Улучшения библиотеки логирования и переход на троттлинг логов

## Описание

Этот PR содержит улучшения системы логирования unified\_agent с акцентом на предотвращение флуда логов и добавление метрик логирования.

## Основные изменения

### 1. Улучшения библиотеки логирования (`library/cpp/unified_agent_client/logger`)

- **Встроенный троттлинг логов**: Добавлена поддержка ограничения частоты логирования на уровне библиотеки

  - Новые макросы `YLOG_*_T` с автоматическим троттлингом (20 логов на 10 секунд по умолчанию)
  - Независимый троттлинг для каждой точки логирования (по `__FILE__:__LINE__`)
  - Автоматический подсчет подавленных сообщений с выводом `[+N suppressed]`

- **Метрики логирования**: Добавлены счетчики для мониторинга активности логирования

  - `RecordsReceived` - общее количество попыток логирования
  - `RecordsDropped` - количество подавленных сообщений из-за троттлинга
  - Счетчики передаются через `TLogger::TCounters` при создании логгера

- **Оптимизация производительности**:

  - Использование `GetCycleCount()` для быстрого получения времени (вместо системных вызовов)
  - Relaxed memory ordering для атомарных операций (достаточно для троттлинга)
  - Минимальные накладные расходы при отключенном логировании

### 2. Переход всех логов агента на троттлинг

- **Унификация макросов**: Все макросы `YLOG_*` в `logbroker/unified_agent/common/util/logger.h` теперь используют троттлинг

  - `YLOG_DEBUG`, `YLOG_INFO`, `YLOG_WARNING`, `YLOG_ERROR` и т.д. теперь автоматически применяют троттлинг
  - Старые макросы `YLOG_*_F` теперь алиасы для новых троттлированных версий
  - Обратная совместимость полностью сохранена

- **Обновление документации**: Файл `for_ai_cpp.md` обновлен с новыми рекомендациями по логированию

### 3. Интеграция метрик логирования в телеметрию

- **Новые счетчики в `TAgentLogCounters`**:

  - `RecordsReceived` - rate метрика `agent.log.records_received`
  - `RecordsDropped` - rate метрика `agent.log.records_dropped`

- **Экспорт метрик**: Метрики логирования автоматически собираются и отправляются в телеметрию агента

- **Рефакторинг конструктора `TAgent`**:

  - Упрощена передача счетчиков через структуру `TAgent::TCounters`
  - Счетчики логирования передаются в `TLogger` при инициализации

### 4. Тесты

- **Перенос тестов**: Тесты логирования перемещены из `logbroker/unified_agent/tests/gtests/logger_tests` в `library/cpp/unified_agent_client/ut`
- **Расширенное покрытие**:
  - Тесты базового троттлинга
  - Тесты счетчика подавленных сообщений
  - Тесты независимого троттлинга для разных точек логирования
  - Тесты форматирования сообщений
  - Тесты счетчиков метрик

### 5. Исправления и улучшения

- **Удаление дублирования кода**: Логика троттлинга теперь находится только в `library/cpp/unified_agent_client/logger.cpp`
- **Упрощение API**: Удален отдельный файл `logbroker/unified_agent/common/util/logger.cpp`
- **Обновление импортов**: Все файлы обновлены для использования правильных заголовочных файлов

## Преимущества

1. **Защита от флуда логов**: Автоматическое ограничение частоты логирования предотвращает переполнение логов
2. **Наблюдаемость**: Метрики логирования позволяют отслеживать активность и проблемы с логированием
3. **Производительность**: Минимальные накладные расходы благодаря оптимизированной реализации
4. **Простота использования**: Троттлинг работает автоматически, не требует изменений в коде
5. **Обратная совместимость**: Все существующие макросы продолжают работать

## Тестирование

- ✅ Все unit-тесты логирования проходят
- ✅ Integration тесты обновлены (исключение нестабильной метрики `RecordsReceived` из сравнений)
- ✅ Проверена работа троттлинга в реальных условиях
commit_hash:75fc97a8576114446bfb9ec11efbb80df322e443
</content>
</entry>
<entry>
<title>/0: Implement logging with throttling, replace YLOG macro with a new one</title>
<updated>2026-02-15T17:01:58Z</updated>
<author>
<name>iofik</name>
<email>iofik@yandex-team.com</email>
</author>
<published>2026-02-15T16:35:37Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=e77160b4c3e62da8be22fa56345b1f80283a422f'/>
<id>urn:sha1:e77160b4c3e62da8be22fa56345b1f80283a422f</id>
<content type='text'>
### Добавлен троттлинг логов для предотвращения флуда

**Основная цель:** Защита от переполнения логов повторяющимися сообщениями.

#### Что добавлено:

1. **Механизм троттлинга логов** ([`common/util/logger.h`](common/util/logger.h), [`common/util/logger.cpp`](common/util/logger.cpp))
   - Ограничение: по умолчанию максимум 20 сообщений за 10 секунд для каждой точки логирования
   - Независимый троттлинг для каждого места вызова `YLOG` (через `static` переменную состояния)
   - Подсчёт пропущенных сообщений с выводом в формате `[+N suppressed]`
   - Минимальные накладные расходы: использует `GetCycleCount()` вместо системных вызовов времени
   - Атомарные операции для потокобезопасности

2. **Настраиваемый троттлинг**
   - Макрос `YLOG_THROTTLED(logger, priority, maxLogs, intervalSec, ...)` для кастомных лимитов
   - Макросы `YLOG_THROTTLE_MAX_LOGS` и `YLOG_THROTTLE_INTERVAL_SECS` для глобальной настройки (например, в тестах)

3. **Поддержка C++20 `std::format`**
   - Макросы `YLOG_*` теперь поддерживают форматирование в стиле C++20: `YLOG_ERR("value: {}", x)`
   - Обратная совместимость с `YLOG_*_F` макросами сохранена
   - Специальная обработка `TStringBuilder` для избежания проблем с временными объектами

4. **Тесты** ([`tests/gtests/logger_tests/`](tests/gtests/logger_tests/))
   - Проверка базового троттлинга
   - Проверка счётчика пропущенных сообщений
   - Проверка независимости троттлинга для разных точек логирования
   - Проверка форматирования сообщений

#### Технические изменения:

**Изменён порядок аргументов в макросе `YLOG`:**
- Было: `YLOG(priority, message, logger)`
- Стало: `YLOG(logger, priority, message, ...)`

Это изменение потребовало обновления всех вызовов `YLOG` в кодовой базе (~30 файлов), но обеспечивает:
- Единообразие с другими логирующими библиотеками
- Возможность использования variadic arguments для форматирования
- Правильную работу с `const auto&amp;` для продления времени жизни временных объектов

#### Примеры использования:

```cpp
// Базовое использование (троттлинг 20 логов/10 сек)
YLOG_ERR("Error occurred");
YLOG_INFO("Processing item: {}", itemId);
```

#### Влияние на производительность:

- Минимальные накладные расходы: ~10-20 наносекунд на вызов (использование CPU cycles вместо системного времени)
- Атомарные операции только при необходимости логирования
- Отсутствие дополнительных (по сравнению с текущим кодом) аллокаций памяти в hot path

#### Обратная совместимость:

- Все существующие `YLOG_*` и `YLOG_*_F` макросы работают как раньше
- Добавлен троттлинг по умолчанию для всех логов (может потребовать внимания при отладке)
commit_hash:d6e98b82486b99b945a1ba264fb2da969db4fcfc
</content>
</entry>
<entry>
<title>Add throttling for max inflight error logs</title>
<updated>2026-01-14T22:51:38Z</updated>
<author>
<name>andybg</name>
<email>andybg@yandex-team.com</email>
</author>
<published>2026-01-14T22:34:18Z</published>
<link rel='alternate' type='text/html' href='https://code.mastervirt.ru/ydb/commit/?id=743db03e460759247cc83b5c9f44cba60416b240'/>
<id>urn:sha1:743db03e460759247cc83b5c9f44cba60416b240</id>
<content type='text'>
## Problem
Log spam with repeated ``'max inflight of [***] bytes reached, [***] bytes dropped'`` errors appearing multiple times within the same second, causing log flooding.

## Solution
Implemented one-second throttling mechanism for this specific error message to prevent log spam while maintaining accurate counter tracking.

## Testing
- Added `TestMaxInflightBytesThrottling` integration test
- Confirms throttling limits log entries to ≤3 for 50 rapid message drops (vs 50 without throttling)
- Ensures all dropped message counters remain accurate regardless of throttling
commit_hash:58f44ca8ce2b8b416586f8ca7a3d3ca971f1e9cb
</content>
</entry>
</feed>
