| Commit message (Collapse) | Author | Age | Files | Lines |
| |
|
|
| |
commit_hash:7218aca25ba819156cd6a364f9bd4ef8598c49ef
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
Unified Agent: новый протокол стриминга и изменения в клиенте
Документ опирается на `library/cpp/unified_agent_client/proto/unified_agent.proto` и реализацию `TClientSession` в `client_impl.cpp`.
---
## 1. Новый протокол
**Legacy** — сессия без согласования версии: в ответе `Initialized` поле `protocol_version` отсутствует или равно **0**. Поведение соответствует старому контракту до появления опциональных полей согласования.
**Версионированный режим (v1 и выше)** — клиент объявляет максимально поддерживаемую версию (`accept_protocol_version`), агент возвращает **согласованную** версию `protocol_version` и опционально **opaque**-токен привязки сессии. Дальнейшие реконнекты с тем же `session_id` требуют передачи **доказательства** привязки (`session_binding_proof`), чтобы сервер мог отличить легитимное продолжение от подмены.
Семантика потока данных не меняется: после `Initialized` идут `DataBatch` / `Ack`, ретраи по `seq_no` и дедупликация на стороне агента при совпадении `session_id` и повторной отправке записей.
---
### 1.2. Новые поля
**Запрос `Request.Initialize`**
| Поле | Тип | Назначение |
|------|-----|------------|
| `accept_protocol_version` | `optional uint32` | Верхняя граница версии, которую клиент готов использовать (стиль HTTP Accept). **Не задано или 0** — клиент не предлагает новый протокол (legacy-only). |
| `session_binding_proof` | `bytes` | Доказательство привязки при реконнекте; используется, когда согласована версия **> 0** и у клиента есть токен от предыдущего `Initialized`. |
Поля `session_id`, `meta`, `shared_secret_key` — как раньше; `session_id` при реконнекте передаётся для дедупликации.
**Ответ `Response.Initialized`**
| Поле | Тип | Назначение |
|------|-----|------------|
| `protocol_version` | `optional uint32` | Согласованная версия: **min**(accept клиента, максимум сервера). **Не задано или 0** — сессия считается **legacy**. |
| `session_binding_token` | `bytes` | Непрозрачный токен: клиент обязан вернуть его в следующем `Initialize` как `session_binding_proof` при переподключении (для версий протокола **≥ 1**). |
Остальное без изменений: `session_id`, `last_seq_no`.
---
### 1.3. Процесс и логика выбора версии
1. Клиент задаёт **`MaxAcceptProtocolVersion`** (в коде: `SetMaxAcceptProtocolVersion`). Значение **0** означает: поле `accept_protocol_version` в `Initialize` **не отправляется** — только legacy.
2. Если `MaxAcceptProtocolVersion > 0`, в первом (и последующих) сообщении `Initialize` выставляется `accept_protocol_version = MaxAcceptProtocolVersion`.
3. Агент вычисляет согласованную версию как **min**(`accept_protocol_version`, собственный потолок поддерживаемых версий) и возвращает её в `Initialized.protocol_version`. Если клиент не прислал accept (legacy-only) или сервер отвечает **0** / не задаёт поле — на клиенте фиксируется **legacy** (`NegotiatedProtocol` сбрасывается).
4. При успешном `Initialized` с **`protocol_version > 0`** клиент сохраняет число версии в `NegotiatedProtocol` и копирует `session_binding_token` во внутреннее состояние для следующих коннектов.
#### Диаграмма обмена (Mermaid sequence)
Первое подключение с согласованием версии и привязкой:
```mermaid
sequenceDiagram
participant C as Клиент (TClientSession)
participant G as gRPC stream
participant A as Unified Agent
C->>G: открыть Session(stream Request / stream Response)
C->>A: Request.initialize<br/>accept_protocol_version = N (если N>0)<br/>session_id (опц.)<br/>meta, shared_secret_key, …
Note over A: min(N, server_max) → negotiated
A->>C: Response.initialized<br/>session_id<br/>last_seq_no<br/>protocol_version = negotiated<br/>session_binding_token (opaque)
C->>C: сохранить NegotiatedProtocol,<br/>SessionBindingToken, SessionId
loop Данные
C->>A: Request.data_batch (seq_no, payload, …)
A->>C: Response.ack (seq_no)
end
Note over C,A: обрыв стрима → реконнект
```
Реконнект при согласованной версии **> 0**:
```mermaid
sequenceDiagram
participant C as Клиент
participant A as Unified Agent
C->>A: Request.initialize<br/>session_id = сохранённый<br/>accept_protocol_version = N<br/>session_binding_proof = прежний token<br/>…
A->>C: Response.initialized<br/>session_id, last_seq_no,<br/>protocol_version, session_binding_token (может обновиться)
C->>A: Request.data_batch …
```
---
### 1.4. Восстановление сессии и обмен ключами
- **Идентификатор сессии:** `session_id` приходит от сервера в `Initialized` и дальше передаётся в `Initialize` при реконнекте — основа для дедупликации и продолжения с теми же `seq_no`.
- **Shared secret:** по-прежнему `shared_secret_key` в `Initialize` (если задан в параметрах клиента) — отдельный канал авторизации/проверки, не смешивается с биндингом стрима.
- **Привязка сессии (protocol v1+):** после первого успешного `Initialized` с `protocol_version > 0` клиент хранит `session_binding_token`. При следующем `PrepareInitializeRequest`, если есть `NegotiatedProtocol > 0` и непустой токен, в запрос кладётся `session_binding_proof` (содержимое токена). Так сервер связывает новый gRPC-стрим с прежней логической сессией.
- **Конфликт сессии:** при завершении gRPC-вызова с кодом **`ALREADY_EXISTS`** клиент **сбрасывает** `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы следующий коннект не повторял конфликтующий идентификатор и не слал устаревшее proof (см. `OnGrpcCallFinished` в `client_impl.cpp`).
---
### 1.5. Совместимость клиентов и серверов
| Клиент | Сервер | Результат |
|--------|--------|-----------|
| `MaxAcceptProtocolVersion = 0` | любой | Поле `accept_protocol_version` не отправляется; ожидается legacy-ответ (`protocol_version` 0 / отсутствует). Биндинг по токену не используется. |
| `MaxAcceptProtocolVersion > 0` | только legacy | Обычно `protocol_version` в ответе 0 или отсутствует — клиент остаётся в legacy для этой сессии. |
| `MaxAcceptProtocolVersion > 0` | поддерживает v1+ | Согласуется конкретное число (например 1); включаются `session_binding_token` / `session_binding_proof` на реконнектах. |
| Новый клиент | старый агент | Безопасный откат: нет обязательных новых полей в wire-format для legacy; сервер игнорирует неизвестные optional-поля (proto3). |
Важно: поведение «только новый протокол» для отдельных механизмов (например принудительная отмена стрима по неактивности) в клиенте завязано на **фактически согласованную** версию **`NegotiatedProtocol > 0`**, а не только на настройку `MaxAcceptProtocolVersion`.
---
## 2. Изменения в клиенте — исправления и защита от регрессий
Ниже — логика, связанная с новым протоколом и устойчивостью сессий (файл `client_impl.cpp`, заголовки `client.h` / `client_impl.h`).
1. **Согласование версии и биндинг** — `PrepareInitializeRequest` выставляет `accept_protocol_version` только при `MaxAcceptProtocolVersion > 0`; при наличии согласованной версии и токена добавляет `session_binding_proof`. `OnGrpcCallInitialized` выставляет `NegotiatedProtocol` и `SessionBindingToken` только если `protocol_version` задан и **> 0**, иначе очищает их (строгий legacy).
2. **Конфликт `ALREADY_EXISTS`** — при таком статусе завершения стрима сбрасываются `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы не зациклиться на неверной паре (session_id, proof) и не провоцировать повторные конфликты на стороне агента.
3. **Watchdog неактивности gRPC (`GrpcCallInactivityTimeout`)** — принудительное закрытие активного вызова (`BeginClose(true)`) и счётчик `GrpcCallsClosedByInactivity` выполняются **только** при `NegotiatedProtocol.Defined() && *NegotiatedProtocol > 0`. Для legacy и до первого успешного `Initialized` с ненулевой версией отмена по этому таймеру **не** выполняется; таймер перепланируется как раньше. Это устраняет нежелательное принудительное реконнект-поведение на транспортах, где ранее допускалось «молчание» без отмены (см. план по этому пути).
4. **Пост-fork дочерний процесс** — сброс `SessionId`, `NegotiatedProtocol`, `SessionBindingToken` вместе с очередями, чтобы дочерний процесс не унаследовал привязку чужой сессии.
Документация в публичном API: комментарий к `SetGrpcCallInactivityTimeout` в `client.h` описывает ограничение по согласованной версии протокола.
---
*При необходимости уточнения формулы `min(accept, server_max)` на стороне агента смотрите реализацию сервера в репозитории `logbroker/unified_agent` (обработка `Initialize` в gRPC-сессии).*
commit_hash:9d5ef1cdc0faf793b4f56bfd2bafa362d7995ac5
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
## Проблема
После аварийного завершения агента (SIGKILL) или длительной недоступности C++ клиент Unified Agent не восстанавливал соединение сам. Сессия оставалась в состоянии «активна», сообщения накапливались в буфере (inflight), при достижении лимита начинались потери (dropped messages), и доставка не возобновлялась даже после поднятия агента — требовался перезапуск приложения-клиента.
## Как исправлено
Добавлен механизм «страховки»: если у сессии есть не подтверждённые сообщения (inflight), но по активному gRPC-стриму долго нет никакой активности (ни read, ни write, ни finish), клиент принудительно отменяет текущий вызов и запускает обычный путь реконнекта (повторные попытки установки сессии). «Длительность молчания» задаётся новым параметром; по умолчанию механизм включён (30 с) — зависание сессии никому не нужно, отключить можно явно, выставив таймаут в 0.
{% cut "Технические детали" %}
**Корневая причина:** реконнект планировался только в `OnGrpcCallFinished`, то есть после завершения активного stream-вызова. При «зависшем» транспорте (transport stall после падения агента) gRPC мог не вызывать финальный callback, `ActiveGrpcCall` оставался занятым, и новый `MakeGrpcCall` не запускался.
**Изменения в коде:**
- В `TClientParameters` добавлен параметр `GrpcCallInactivityTimeout` (по умолчанию 30 с; 0 = отключено).
- В сессии заведён периодический watchdog-таймер; при срабатывании проверяется: есть ли активный `TGrpcCall`, есть ли inflight-сообщения и не было ли активности дольше заданного таймаута. При выполнении условий вызывается принудительное закрытие текущего call (`BeginClose(true)`), что приводит к `OnGrpcCallFinished` и планированию следующего `MakeGrpcCall` по `GrpcReconnectDelay`.
- Время последней активности обновляется при любом успешном событии стрима (Accept, Read, Write, Finish, а также при инициализации сессии и при ack).
**Тестирование:** добавлен интеграционный тест `TestReconnectAfterLongStall` (агент поднимается → сессия и первая доставка → убийство агента → отправка во время даунтайма → пауза → повторный запуск агента → проверка, что inflight обнуляется и сессия переинициализируется без рестарта приложения). Регрессии по существующим reconnect-тестам не наблюдаются.
{% endcut %}
commit_hash:f55efec2cff20fa975500e73d60ee57654abb2e0
|
| |
|
|
| |
commit_hash:e5545cade7cc946c943e85c680db7c276edc48b5
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
## 1. Где и когда это может происходить
Гонка проявляется при **одновременном** завершении задачи (Unregister/Join) и выполнении finish-действия `CommitTimer()` в воркере. Типичные сценарии:
- **Остановка агента** — при shutdown закрываются сессии, по задачам вызывается Unregister(); в это же время воркер может доходить до finish-действий уже «уходящей» задачи.
- **Закрытие сессии/канала** — например, отключение клиента, ошибка — владелец вызывает Unregister() по задаче, в которой использовался таймер.
- **Плагины и задачи, подверженные багу:** любые, что используют `TLocalTimersQueue` (отложенные таймеры через `DelayedExecutor`). В коде UA это, в частности:
- **http_output** (`plugins/lib/http/http_sender.cpp`) — таймеры для отложенных повторов запросов и flush;
- **file_input** (`plugins/file_input/file_input.cpp`) — таймеры при работе с файлами;
- **logbroker_output_new** (`plugins/logbroker_output_new/logbroker_output_impl.h`) — таймер обновления метрик.
Во всех этих случаях задача владеет `TLocalTimersQueue`, при установке таймера в finish-действие попадает `CommitTimer()`, и при быстром Unregister() возможна гонка с Join().
---
{% cut "Зачем потоку воркера вызывать Ref()" %}
Таймер планируется **асинхронно**: `DelayedExecutor.SetTimer(Timer, triggerTime)` регистрирует callback в отдельном потоке (sleeper). Когда время наступит, callback вызовется **уже после** того, как `Run()` задачи завершился. Задача и её контекст (сессия, `TLocalTimersQueue`, `ExecutionJoiner`) должны оставаться валидными до срабатывания или отмены таймера — иначе callback приведёт к use-after-free.
**Ref()** — это «удержание» задачи: пока есть лишний Ref, `Join()` не завершится (Refs не станет 0). То есть: «задача не считается полностью завершённой, пока таймер не сработал или не был сброшен». Когда таймер сбрасывают или очередь финализируют, вызывается **UnRef()** — тогда задача может перейти в joined.
**Где именно вызывается Ref/UnRef:**
Файл `logbroker/unified_agent/common/delayed_executor.cpp`:
```cpp
void TLocalTimersQueue::CommitTimer() {
if (Queue.GetCount() > 0) {
const auto triggerTime = Top().Value();
if (!TimerTriggerTime.Defined() || ...) {
if (!TimerTriggerTime.Defined()) {
// ← ЗДЕСЬ: перед первой установкой таймера в DelayedExecutor
// держим задачу «живой» до срабатывания/сброса таймера
if (!TTaskExecutor::CurrentTaskOrDie().ExecutionJoiner().TryRef()) {
CommitTimerScheduled = false;
return;
}
}
DelayedExecutor.SetTimer(Timer, triggerTime); // асинхронный таймер
TimerTriggerTime = triggerTime;
}
} else if (TimerTriggerTime.Defined()) {
DelayedExecutor.ResetTimer(Timer);
TTaskExecutor::CurrentTaskOrDie().ExecutionJoiner().UnRef(); // таймер снят — отдаём Ref
TimerTriggerTime.Clear();
}
...
}
```
`CommitTimer()` вызывается из **finish-действия** задачи (добавляется в `EnsureCommitTimerScheduled()` → `AddFinishAction([this]() { CommitTimer(); })`), т.е. выполняется в потоке воркера после выхода из `Run()`. Пример использования таймера из кода плагина — `plugins/lib/http/http_sender.cpp`: там `LocalTimerQueue.SetTimer(request->Timer, triggerTime)` планирует отложенный повтор запроса; callback при срабатывании постит событие в сессию.
{% endcut %}
---
{% cut "Контекст: участники и суть гонки" %}
**Участники:**
- **ExecutionJoiner** (`TAsyncJoiner`) — объект с атомарным счётчиком `Refs` (начальное значение 1). Пока `Refs >= 1`, задачу считают «активной». `Join()` вызывает `UnRef()`; когда `Refs` становится 0, вызывается `Promise.SetValue()` («joined»).
- **Поток задачи (Task/Unregister)** — владелец задачи; вызывает `Unregister()` → `ExecutionJoiner_.Join()` → внутри один раз `UnRef()`.
- **Поток таймера (Worker/Timer)** — воркер пула задач; выполняет finish-действия задачи. Одно из них — `CommitTimer()`, которое при первой установке таймера вызывало `ExecutionJoiner().Ref()`.
**Гонка:** между моментом, когда поток задачи делает `Join()` (и доводит `Refs` до 0), и моментом, когда поток воркера выполняет `CommitTimer()` и вызывает `Ref()`. Если `Ref()` вызывается уже после перехода в «joined», `fetch_add(1)` возвращает 0 и срабатывает `Y_ABORT_UNLESS(result >= 1, "already joined")`.
{% endcut %}
---
{% cut "До фикса: креш при гонке" %}
Поток задачи вызывает `Unregister()` и ждёт `Join()`. Поток воркера после завершения `Run()` выполняет finish-действие `CommitTimer()`. Если к этому моменту `Join()` уже выполнил `UnRef()` и `Refs == 0`, вызов `Ref()` в `CommitTimer()` приводит к падению.
```mermaid
sequenceDiagram
participant TaskThread as Поток задачи
participant Joiner as ExecutionJoiner
participant WorkerThread as Поток воркера
Note over TaskThread,WorkerThread: Задача с таймером: Run() вызвал SetTimer(), в finish-действия добавлен CommitTimer()
TaskThread->>TaskThread: Unregister()
TaskThread->>Joiner: Join()
Joiner->>Joiner: UnRef() → Refs = 0
Joiner->>Joiner: Promise.SetValue() — joined
WorkerThread->>WorkerThread: Выполняет finish-действия
WorkerThread->>WorkerThread: CommitTimer()
WorkerThread->>Joiner: Ref()
Joiner->>Joiner: fetch_add(1) → result = 0
Joiner->>WorkerThread: Y_ABORT_UNLESS(result >= 1) — CRASH
```
**Итог:** в момент вызова `Ref()` в `CommitTimer()` объект уже в состоянии «joined» (`Refs == 0`), проверка в `Ref()` не выполняется → **SIGABRT**.
{% endcut %}
---
{% cut "После фикса: корректный выход без креша" %}
В `CommitTimer()` вместо `Ref()` вызывается `TryRef()`: атомарно проверяется `Refs >= 1` (через CAS); если уже 0, `TryRef()` возвращает `false` и `CommitTimer()` сразу выходит, не вызывая `Ref()` и не трогая таймер.
```mermaid
sequenceDiagram
participant TaskThread as Поток задачи
participant Joiner as ExecutionJoiner
participant WorkerThread as Поток воркера
Note over TaskThread,WorkerThread: Та же гонка: Join() и CommitTimer() выполняются почти одновременно
TaskThread->>TaskThread: Unregister()
TaskThread->>Joiner: Join()
Joiner->>Joiner: UnRef() → Refs = 0
Joiner->>Joiner: Promise.SetValue() — joined
WorkerThread->>WorkerThread: Выполняет finish-действия
WorkerThread->>WorkerThread: CommitTimer()
WorkerThread->>Joiner: TryRef()
Joiner->>Joiner: load(Refs) = 0 → current < 1
Joiner->>WorkerThread: return false
WorkerThread->>WorkerThread: CommitTimerScheduled = false return
Note over WorkerThread: Таймер не ставится, креша нет
```
**Итог:** при уже «joined» состоянии `TryRef()` возвращает `false`, `CommitTimer()` завершается без вызова `Ref()` и без падения.
{% endcut %}
---
{% cut "Сводка изменений" %}
| Место | До фикса | После фикса |
|-------|----------|-------------|
| `CommitTimer()` при первой установке таймера | `Ref()` → при Refs=0 креш | `TryRef()` → при `false` ранний выход |
| `TAsyncJoiner` | Только `Ref()` / `UnRef()` | Добавлен `TryRef()` (CAS, при refs<1 возврат false) |
| `Finalize()` | Не сбрасывал активный таймер | При `TimerTriggerTime.Defined()` — `ResetTimer`, `UnRef()`, `Clear()` до `Finalized = true` |
Тест `TestTimerQueueUnregisterNoCrash` (500 итераций: задача с таймером → Pulse → Unregister) без фикса периодически воспроизводит креш; с фиксом — стабильно зелёный.
{% endcut %}
commit_hash:5f57d88fc53f44db31e87deaeca57a7e9ef262ca
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
## Абстракция управления executor’ами (TExecutorSet, TExecutorSetManager)
### Что сделано
Вводится единый слой для работы с пулами потоков agent’а: main, background и IO.
**TExecutorSet** — представление набора executor’ов:
- `Main()`, `Background()`, `IO()` — одинаковый интерфейс для всех трёх пулов
- IO возвращает тот же объект, что и Main, когда `separate_io_thread_pool == 0`, иначе — отдельный executor
**TExecutorSetManager** — создание и владение executor’ами:
- `GetExecutors(EExecutorPool pool)` — возвращает нужный набор; пока используются только `Global` и `Metrics` (оба отдают один и тот же набор)
- `Start()`, `Stop()` — жизненный цикл
- Main создаётся сразу, Background и IO — лениво при первом обращении
**Рефакторинг TAgent**:
- Вместо отдельных полей `TaskExecutor`, `BackgroundTaskExecutor`, `IOTaskExecutor` и `DelayedExecutor` используется `ExecutorSetManager`
- Обращения к executor’ам идут через `ExecutorSetManager->GetExecutors(EExecutorPool::Global)`
- FSNotifier, MemoryQuotaManager, AgentContext, CountersUpdater работают с глобальными executor’ами
**RegisterCountersWith**:
- Регистрирует счётчики main, background, IO (если включён отдельный IO-пул) и delayed
- Раньше IO-счётчики не учитывались — это исправлено
**Тесты**:
- Добавлен gtest-модуль `executor_set_tests` в `tests/gtests/`:
- Проверяются GetExecutors, доступность Main, ленивая инициализация Background
- Поведение IO при `separate_io_thread_pool == 0` и `> 0`
- Start/Stop и параметр pool в GetExecutors
### Цель
Подготовка к PR1: выделение отдельных пулов для pipelines. Сейчас меняется только абстракция, без изменения конфигурации и без добавления dedicated pools.
commit_hash:0f1b2205c474db1b601a72f063916333a74882d6
|
| |
|
|
|
| |
_Все вызовы вида YLOG\*\_F(...) заменены на YLOG\*_(...) по всему дереву unified\_agent (например, YLOG\_DEBUG\_F → YLOG\_DEBUG, YLOG\_ERR\_F → YLOG\_ERROR).
commit_hash:483c6f5cb6db2d44e9d71b427a697dc2850cba1f
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
# Улучшения библиотеки логирования и переход на троттлинг логов
## Описание
Этот PR содержит улучшения системы логирования unified\_agent с акцентом на предотвращение флуда логов и добавление метрик логирования.
## Основные изменения
### 1. Улучшения библиотеки логирования (`library/cpp/unified_agent_client/logger`)
- **Встроенный троттлинг логов**: Добавлена поддержка ограничения частоты логирования на уровне библиотеки
- Новые макросы `YLOG_*_T` с автоматическим троттлингом (20 логов на 10 секунд по умолчанию)
- Независимый троттлинг для каждой точки логирования (по `__FILE__:__LINE__`)
- Автоматический подсчет подавленных сообщений с выводом `[+N suppressed]`
- **Метрики логирования**: Добавлены счетчики для мониторинга активности логирования
- `RecordsReceived` - общее количество попыток логирования
- `RecordsDropped` - количество подавленных сообщений из-за троттлинга
- Счетчики передаются через `TLogger::TCounters` при создании логгера
- **Оптимизация производительности**:
- Использование `GetCycleCount()` для быстрого получения времени (вместо системных вызовов)
- Relaxed memory ordering для атомарных операций (достаточно для троттлинга)
- Минимальные накладные расходы при отключенном логировании
### 2. Переход всех логов агента на троттлинг
- **Унификация макросов**: Все макросы `YLOG_*` в `logbroker/unified_agent/common/util/logger.h` теперь используют троттлинг
- `YLOG_DEBUG`, `YLOG_INFO`, `YLOG_WARNING`, `YLOG_ERROR` и т.д. теперь автоматически применяют троттлинг
- Старые макросы `YLOG_*_F` теперь алиасы для новых троттлированных версий
- Обратная совместимость полностью сохранена
- **Обновление документации**: Файл `for_ai_cpp.md` обновлен с новыми рекомендациями по логированию
### 3. Интеграция метрик логирования в телеметрию
- **Новые счетчики в `TAgentLogCounters`**:
- `RecordsReceived` - rate метрика `agent.log.records_received`
- `RecordsDropped` - rate метрика `agent.log.records_dropped`
- **Экспорт метрик**: Метрики логирования автоматически собираются и отправляются в телеметрию агента
- **Рефакторинг конструктора `TAgent`**:
- Упрощена передача счетчиков через структуру `TAgent::TCounters`
- Счетчики логирования передаются в `TLogger` при инициализации
### 4. Тесты
- **Перенос тестов**: Тесты логирования перемещены из `logbroker/unified_agent/tests/gtests/logger_tests` в `library/cpp/unified_agent_client/ut`
- **Расширенное покрытие**:
- Тесты базового троттлинга
- Тесты счетчика подавленных сообщений
- Тесты независимого троттлинга для разных точек логирования
- Тесты форматирования сообщений
- Тесты счетчиков метрик
### 5. Исправления и улучшения
- **Удаление дублирования кода**: Логика троттлинга теперь находится только в `library/cpp/unified_agent_client/logger.cpp`
- **Упрощение API**: Удален отдельный файл `logbroker/unified_agent/common/util/logger.cpp`
- **Обновление импортов**: Все файлы обновлены для использования правильных заголовочных файлов
## Преимущества
1. **Защита от флуда логов**: Автоматическое ограничение частоты логирования предотвращает переполнение логов
2. **Наблюдаемость**: Метрики логирования позволяют отслеживать активность и проблемы с логированием
3. **Производительность**: Минимальные накладные расходы благодаря оптимизированной реализации
4. **Простота использования**: Троттлинг работает автоматически, не требует изменений в коде
5. **Обратная совместимость**: Все существующие макросы продолжают работать
## Тестирование
- ✅ Все unit-тесты логирования проходят
- ✅ Integration тесты обновлены (исключение нестабильной метрики `RecordsReceived` из сравнений)
- ✅ Проверена работа троттлинга в реальных условиях
commit_hash:75fc97a8576114446bfb9ec11efbb80df322e443
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
### Добавлен троттлинг логов для предотвращения флуда
**Основная цель:** Защита от переполнения логов повторяющимися сообщениями.
#### Что добавлено:
1. **Механизм троттлинга логов** ([`common/util/logger.h`](common/util/logger.h), [`common/util/logger.cpp`](common/util/logger.cpp))
- Ограничение: по умолчанию максимум 20 сообщений за 10 секунд для каждой точки логирования
- Независимый троттлинг для каждого места вызова `YLOG` (через `static` переменную состояния)
- Подсчёт пропущенных сообщений с выводом в формате `[+N suppressed]`
- Минимальные накладные расходы: использует `GetCycleCount()` вместо системных вызовов времени
- Атомарные операции для потокобезопасности
2. **Настраиваемый троттлинг**
- Макрос `YLOG_THROTTLED(logger, priority, maxLogs, intervalSec, ...)` для кастомных лимитов
- Макросы `YLOG_THROTTLE_MAX_LOGS` и `YLOG_THROTTLE_INTERVAL_SECS` для глобальной настройки (например, в тестах)
3. **Поддержка C++20 `std::format`**
- Макросы `YLOG_*` теперь поддерживают форматирование в стиле C++20: `YLOG_ERR("value: {}", x)`
- Обратная совместимость с `YLOG_*_F` макросами сохранена
- Специальная обработка `TStringBuilder` для избежания проблем с временными объектами
4. **Тесты** ([`tests/gtests/logger_tests/`](tests/gtests/logger_tests/))
- Проверка базового троттлинга
- Проверка счётчика пропущенных сообщений
- Проверка независимости троттлинга для разных точек логирования
- Проверка форматирования сообщений
#### Технические изменения:
**Изменён порядок аргументов в макросе `YLOG`:**
- Было: `YLOG(priority, message, logger)`
- Стало: `YLOG(logger, priority, message, ...)`
Это изменение потребовало обновления всех вызовов `YLOG` в кодовой базе (~30 файлов), но обеспечивает:
- Единообразие с другими логирующими библиотеками
- Возможность использования variadic arguments для форматирования
- Правильную работу с `const auto&` для продления времени жизни временных объектов
#### Примеры использования:
```cpp
// Базовое использование (троттлинг 20 логов/10 сек)
YLOG_ERR("Error occurred");
YLOG_INFO("Processing item: {}", itemId);
```
#### Влияние на производительность:
- Минимальные накладные расходы: ~10-20 наносекунд на вызов (использование CPU cycles вместо системного времени)
- Атомарные операции только при необходимости логирования
- Отсутствие дополнительных (по сравнению с текущим кодом) аллокаций памяти в hot path
#### Обратная совместимость:
- Все существующие `YLOG_*` и `YLOG_*_F` макросы работают как раньше
- Добавлен троттлинг по умолчанию для всех логов (может потребовать внимания при отладке)
commit_hash:d6e98b82486b99b945a1ba264fb2da969db4fcfc
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
| |
## Problem
Log spam with repeated ``'max inflight of [***] bytes reached, [***] bytes dropped'`` errors appearing multiple times within the same second, causing log flooding.
## Solution
Implemented one-second throttling mechanism for this specific error message to prevent log spam while maintaining accurate counter tracking.
## Testing
- Added `TestMaxInflightBytesThrottling` integration test
- Confirms throttling limits log entries to ≤3 for 50 rapid message drops (vs 50 without throttling)
- Ensures all dropped message counters remain accurate regardless of throttling
commit_hash:58f44ca8ce2b8b416586f8ca7a3d3ca971f1e9cb
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
## Summary
Eliminated all dangerous c_str() patterns from unified_agent by migrating to modern YLOG_*_F formatting.
## Changes
- Remove .c_str() on temporaries in YLOG_*_F macros (1 fix)
- Migrate YLOG_*(Sprintf(...)) to modern YLOG_*_F(...) (23 fixes)
## Benefits
- Modern C++20 type-safe formatting with std::format
- Cleaner, more maintainable code (40% less in some cases)
- Better performance (no Sprintf overhead)
commit_hash:ee3f665ccc823d9e2165a90c036e5bd887cd179e
|
| |
|
|
|
|
|
| |
See full description in separate comment.
The open source part is to add implementation for formatter specialization for TString.
commit_hash:9501df4a7287050b72162b80823d5cbbd1d7464f
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
**Summary**
Реализован новый входной плагин `agent_logs` для отправки логов самого агента в пайплайн unified\_agent. Это решает проблему доступа к логам агента в production окружении, где требовались права System developer для доступа к файлам на дисках.
## **What's Changed** {#whats-changed}
### **New Plugin: `agent_logs_input`** {#new-plugin-agent_logs_input}
Добавлен новый входной плагин, который:
* Перехватывает логи агента через кастомный `TLogBackend`
* Отправляет логи в пайплайн стриммингово (не пулингом)
* Поддерживает фильтрацию по уровню логов (настройка **level**, по умолчанию `NOTICE`)
### **Implementation Details** {#implementation-details}
**Основные компоненты:**
1. **TAgentLogsBackend** - кастомный log backend:
* Наследуется от `TLogBackend`
* Перехватывает все логи агента через метод `WriteData()`
* Фильтрует логи по приоритету
* Отправляет отфильтрованные логи в `IMessageConsumer`
* Поддерживает graceful stop
2. **TPlugin** - основной класс плагина:
* Управляет lifecycle плагина
* Создает сессию с ID `agent_logs`
* Передает конфигурацию в session handler
3. **TSessionHandler** - обработчик сессии:
* Создает и регистрирует `TAgentLogsBackend`
* Добавляет backend в глобальный logger через `AddLog()`
* Управляет остановкой backend'а
**Пример конфигурации:**
```
routes:
- input:
plugin: agent_logs
config:
level: INFO
channel:
pipe:
- filter:
plugin: format_otel_logs
output:
# https://nda.ya.ru/t/YexxO8Qf7NZMzy
plugin: otel_logs
config:
url: "collector.logs.yandex-team.ru:443"
project: hobbit_test_project
service: logs
# https://nda.ya.ru/t/o4MQT5rI7NZMzz
oauth:
secret:
env: TEST_LOGS_OAUTH
agent_log:
priority: INFO
rate_limit_bytes: 100000
```
commit_hash:c6f115ab98c30a47845ce9f9f8a58ef89973d422
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
Проблема:
Сегментация через ~42 секунды после запуска UA в ServiceConfigParser::GetParserIndex("retry").
Причина:
Static Initialization Order Fiasco + Lazy Initialization в gRPC CoreConfiguration.
RetryFilter пытается получить парсер до того, как CoreConfiguration зарегистрировала его.
Решение:
1. Устанавливаем лимиты потоков ДО grpc_init() (статические параметры)
2. Явно вызываем grpc_init() для гарантии выполнения всех статических инициализаторов
3. Принудительно инициализируем CoreConfiguration через CoreConfiguration::Get()
Это гарантирует порядок:
- Настройка лимитов потоков
- Инициализация библиотеки gRPC
- Построение CoreConfiguration с регистрацией всех парсеров
- Только после этого фильтры могут безопасно использовать retry парсер
Изменения:
- library/cpp/unified_agent_client/grpc_io.cpp: Исправлена EnsureGrpcConfigured()
- tests/ut/grpc_init_ut.cpp: Добавлены unit тесты
- tests/grpc_init_check/: Standalone тестовая программа
- tests/ut/ya.make: Интеграция тестов в систему сборки
Все тесты прошли: 640/640 OK
commit_hash:ed4601dfe21f6dfac653dec6e9c3e535e5a0a09c
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
быстрой отдачи метрик в Multishard Metrics Pull
## С точки зрения пользователя
### 1. Как включать изменения
- Изменения автоматически активируются при использовании `metrics_pull_output_par` параметра
- Количество тредов в тредпуле устанавливается параметром `metrics_pull_output_threads` (по-умолчанию 1)
- Совместимо с существующими конфигурациями метрик
#### Пример конфигурации:
```yaml
experimental:
metrics_pull_output_par: true
metrics_pull_output_threads: 16
```
### 2. На что влияют изменения
- **Производительность обработки метрик:** Группировка метрик по шардам теперь происходит один раз в момент записи сообщения в хранилище
- **Масштабируемость:** Параллельная обработка команд на запись и чтение сообщений
## С точки зрения программиста
### 1. Новая архитектура и взаимодействие
#### Архитектура шардирования
```
TMultishardMetricsStoreHandler
↓
TShardedMetricsQueue
↓
TUAShardedMetricsWriter → IMultiShardEncoder
```
#### Принцип работы
1. **Шардирование:** Метрики группируются по ключу `(project, service, cluster)` из лейблов `__ua_project`, `__ua_service`, `__ua_cluster`
2. **Сортировка:** Внутри каждого сообщения метрики сортируются по ключам шардов
3. **Слияние:** При чтении используется priority queue для объединения шардов из разных сообщений
4. **Вывод:** Данные выводятся последовательно по шардам а внутри шарда в порядке добавления сообщений
### 2. Новые классы
#### `TMultishardMetricsStoreHandler`
- **Назначение:** Основной обработчик команд для шардированных метрик
- **Ключевые методы:**
- `Handle(TWriteCmd&)` - запись метрик в очередь
- `Handle(TMultishardReadCmd&)` - чтение из множественных шардов
- `Handle(TReadCmd&)` - чтение из одного шарда - не поддерживатся
- Обработчики остальных сообщений адаптированы для работы с `TShardedMetricsQueue`
- **Особенности:** Управляет сессиями и делегирует работу с данными в `TShardedMetricsQueue`
#### `TShardedMetricsQueue`
- **Назначение:** Очередь метрик с поддержкой шардирования
- **Ключевые структуры:**
```cpp
struct TShardedMessage {
std::unique_ptr<TOutputMessage> Message;
std::vector<TChunk> Chunks; // Индексы шардов
};
```
- **Алгоритм:** Сортировка метрик по шардам + priority queue для слияния при чтении
#### `IUAShardedMetricsWriter` + `TUAShardedMetricsWriter`
- **Назначение:** Интерфейс и реализация для записи шардированных метрик
- **Расширение:** Добавляет методы `OnShardBegin/End`, `OnStreamBegin/End` к базовому `IUAMetricsWriter`
- **Интеграция:** Работает с `NSolomon::NMultiShard::IMultiShardEncoder`
### 3. Рефакторинг
#### Изменения в базовых классах
- **`TMetricsStoreHandlerBase`:** Вынесен метод `GetMetricsWriterOptions()` для переиспользования
- **`TUAMetricsWriter`:** Добавлено `virtual` наследование для множественного наследования
!!! - **`TOutputItem`:** Изменена видимость конструктора (убран лишний `private`)
#### Структурные изменения
- **Новый модуль:** `metrics_multishard_pull_output` с собственными классами
- **Тесты:** Добавлены unit-тесты `sharded_metrics_queue_ut.cpp` для проверки логики шардирования, а интеграционные тесты продублированы для проверки работы в параллельном режиме
- **Сборка:** Обновлен `ya.make` для включения новых файлов
commit_hash:44618a480d29307e7131b82b19473d284ea68efa
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
Разломались на вызове YLOG\_DEBUG\_F с ошибкой:
```
/home/kirill-bel/arcadia/logbroker/unified_agent/common/zstd_file_stream.cpp:39:17: error: no member named 'format' in namespace 'std'
39 | YLOG_DEBUG_F("TZstdFileStream starting from frame pos = {} frame offset = {}",
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
40 | lastFramePos, lastFrameOffset);
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
```
В YLOG\_DEBUG\_F используется `std::format`, но хедер не включен в `library/cpp/unified_agent_client/logger.h`:
```
#define YLOG_DEBUG_F(fmt, ...) YLOG_DEBUG(std::format(fmt, __VA_ARGS__))
```
commit_hash:0a82f439ec6b8d83e2f4ac1a7d8a8c8cf29b9cc2
|
| |
|
|
|
|
|
| |
YLOG_<Type>(std::format(...)) to them
Мы часто используем std::format, готовясь к переходу с TString на std::string, но из-за этого появляется много лишнего кода. Поэтому были добавлены макросы логгера для удобства и существующий код был переведен на них
commit_hash:e564c913e0dc82817152b7a15c543a77812a1dcc
|
| |
|
|
|
| |
Убираю лишнее копирование. Актуально для REVIEW:7901279
commit_hash:79f0f8873f80375269dd5384e189ac4f3b260017
|
| |
|
|
| |
commit_hash:af6dedadd4d7fe292bcb7a8b6de366aff4e630b1
|
| |
|
|
|
|
|
| |
Add test to verify the threads limit
Try to fix; Limit number of gRPC threads.
commit_hash:d525e94e0ef397bf05d367ec9904d674c17d0d76
|
| |
|
|
|
|
|
| |
fintech, games, geobase, infra, ipreg, juggler, kernel, keyboard, laas, library, logbroker, logos, mail
If you think that this pr has broken something for you, roll it back
commit_hash:df8e48b2a4ecbbc74a504aa3ff62ebb8f464218d
|
| |
|
|
|
| |
Всегда ожидает Close event от СДК, но проверяет таймаут ожидания. Проверка активности соединения, по таймауту сделает реконект. Немного снижено потребление cpu.
4c41128e833c27cea3d1eb0a90c086d1a328f282
|
| |
|
|
|
|
| |
`include/grpc++` was the original directory name for all C++ header files but it conflicted with the naming scheme required for some build systems.
It is superseded by `include/grpcpp`.
ede5ac168419131cfa95db8f7d3cb0bf11597992
|
| |
|
|
| |
Relates: https://st.yandex-team.ru/, https://st.yandex-team.ru/
|
| |
|
|
|
| |
* Library import 8
* Add contrib/libs/cxxsupp/libcxx/include/__verbose_abort
|
| | |
|
| |
|
|
| |
Update tools: yexport, os-yexport
|
| | |
|
| |
|
|
| |
https://clubs.at.yandex-team.ru/arcadia/29404
|
| |
|
|
| |
https://clubs.at.yandex-team.ru/arcadia/29404
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
| |
update grpc to 1.53.1
update grpcio/py3 to 1.53.1
Added patches:
22-grpc-code-output.patch - allow translation of grpc code to internal string type.
23-max-thread-limitation.patch - to provide interface for settings of thread number limit, as
grpc::DynamicThreadPool doesn't provide interface to limit thread number anymore.
24-support_for-non-abort-grpc.patch - generate exception instead of application crash
25-forkable-destruction-order.patch - correct forkable logic for TimerManager
27-skip-child-post-fork-operations.patch - allow to skip child post fork operations to exclude UB (used for unified agent only)
pr33495_fox_nested_fork.patch - fix issues with nested forks
pr33582_fork_handler.patch - disable fork handler support if it is not requested intentionally
|
| |
|
| |
unified-agent - fix static initialization order problem
|
| | |
|
| | |
|
| | |
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
update grpc to 1.50.2
update grpcio to 1.50.0
Удаленные патчи:
06-flow_control.patch - логика в upstream удалена
10-fix-crash-on-fork.patch - логика в upstream удалена
12-coverity-fix.patch - логика в upstream удалена
20-P2166-string-nullptr.patch - в upstream временный объект вместо nullptr
PR29209-fix-heap-use-after-free.patch - решение есть в upstream
Добавленные патчи:
pr33085_fix_epoll1_engine_reinit.patch
21-windows_build.patch
|
| |
|
| |
Correct order of the GRPC_ENABLE_FORK_SUPPORT set
|
| |
|
| |
Update tools: yexport
|
| | |
|
| |
|
| |
Update tools: yexport
|
| | |
|
| | |
|
| | |
|
| |
|
| |
Update tools: yexport
|
| | |
|