summaryrefslogtreecommitdiffstats
path: root/library/cpp/unified_agent_client
Commit message (Collapse)AuthorAgeFilesLines
* Intermediate changesrobot-piglet2026-05-181-0/+23
| | | | commit_hash:7218aca25ba819156cd6a364f9bd4ef8598c49ef
* gRPC inactivity watchdog only for negotiated protocol v1+andybg2026-04-244-20/+97
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | Unified Agent: новый протокол стриминга и изменения в клиенте Документ опирается на `library/cpp/unified_agent_client/proto/unified_agent.proto` и реализацию `TClientSession` в `client_impl.cpp`. --- ## 1. Новый протокол **Legacy** — сессия без согласования версии: в ответе `Initialized` поле `protocol_version` отсутствует или равно **0**. Поведение соответствует старому контракту до появления опциональных полей согласования. **Версионированный режим (v1 и выше)** — клиент объявляет максимально поддерживаемую версию (`accept_protocol_version`), агент возвращает **согласованную** версию `protocol_version` и опционально **opaque**-токен привязки сессии. Дальнейшие реконнекты с тем же `session_id` требуют передачи **доказательства** привязки (`session_binding_proof`), чтобы сервер мог отличить легитимное продолжение от подмены. Семантика потока данных не меняется: после `Initialized` идут `DataBatch` / `Ack`, ретраи по `seq_no` и дедупликация на стороне агента при совпадении `session_id` и повторной отправке записей. --- ### 1.2. Новые поля **Запрос `Request.Initialize`** | Поле | Тип | Назначение | |------|-----|------------| | `accept_protocol_version` | `optional uint32` | Верхняя граница версии, которую клиент готов использовать (стиль HTTP Accept). **Не задано или 0** — клиент не предлагает новый протокол (legacy-only). | | `session_binding_proof` | `bytes` | Доказательство привязки при реконнекте; используется, когда согласована версия **> 0** и у клиента есть токен от предыдущего `Initialized`. | Поля `session_id`, `meta`, `shared_secret_key` — как раньше; `session_id` при реконнекте передаётся для дедупликации. **Ответ `Response.Initialized`** | Поле | Тип | Назначение | |------|-----|------------| | `protocol_version` | `optional uint32` | Согласованная версия: **min**(accept клиента, максимум сервера). **Не задано или 0** — сессия считается **legacy**. | | `session_binding_token` | `bytes` | Непрозрачный токен: клиент обязан вернуть его в следующем `Initialize` как `session_binding_proof` при переподключении (для версий протокола **≥ 1**). | Остальное без изменений: `session_id`, `last_seq_no`. --- ### 1.3. Процесс и логика выбора версии 1. Клиент задаёт **`MaxAcceptProtocolVersion`** (в коде: `SetMaxAcceptProtocolVersion`). Значение **0** означает: поле `accept_protocol_version` в `Initialize` **не отправляется** — только legacy. 2. Если `MaxAcceptProtocolVersion > 0`, в первом (и последующих) сообщении `Initialize` выставляется `accept_protocol_version = MaxAcceptProtocolVersion`. 3. Агент вычисляет согласованную версию как **min**(`accept_protocol_version`, собственный потолок поддерживаемых версий) и возвращает её в `Initialized.protocol_version`. Если клиент не прислал accept (legacy-only) или сервер отвечает **0** / не задаёт поле — на клиенте фиксируется **legacy** (`NegotiatedProtocol` сбрасывается). 4. При успешном `Initialized` с **`protocol_version > 0`** клиент сохраняет число версии в `NegotiatedProtocol` и копирует `session_binding_token` во внутреннее состояние для следующих коннектов. #### Диаграмма обмена (Mermaid sequence) Первое подключение с согласованием версии и привязкой: ```mermaid sequenceDiagram participant C as Клиент (TClientSession) participant G as gRPC stream participant A as Unified Agent C->>G: открыть Session(stream Request / stream Response) C->>A: Request.initialize<br/>accept_protocol_version = N (если N>0)<br/>session_id (опц.)<br/>meta, shared_secret_key, … Note over A: min(N, server_max) → negotiated A->>C: Response.initialized<br/>session_id<br/>last_seq_no<br/>protocol_version = negotiated<br/>session_binding_token (opaque) C->>C: сохранить NegotiatedProtocol,<br/>SessionBindingToken, SessionId loop Данные C->>A: Request.data_batch (seq_no, payload, …) A->>C: Response.ack (seq_no) end Note over C,A: обрыв стрима → реконнект ``` Реконнект при согласованной версии **> 0**: ```mermaid sequenceDiagram participant C as Клиент participant A as Unified Agent C->>A: Request.initialize<br/>session_id = сохранённый<br/>accept_protocol_version = N<br/>session_binding_proof = прежний token<br/>… A->>C: Response.initialized<br/>session_id, last_seq_no,<br/>protocol_version, session_binding_token (может обновиться) C->>A: Request.data_batch … ``` --- ### 1.4. Восстановление сессии и обмен ключами - **Идентификатор сессии:** `session_id` приходит от сервера в `Initialized` и дальше передаётся в `Initialize` при реконнекте — основа для дедупликации и продолжения с теми же `seq_no`. - **Shared secret:** по-прежнему `shared_secret_key` в `Initialize` (если задан в параметрах клиента) — отдельный канал авторизации/проверки, не смешивается с биндингом стрима. - **Привязка сессии (protocol v1+):** после первого успешного `Initialized` с `protocol_version > 0` клиент хранит `session_binding_token`. При следующем `PrepareInitializeRequest`, если есть `NegotiatedProtocol > 0` и непустой токен, в запрос кладётся `session_binding_proof` (содержимое токена). Так сервер связывает новый gRPC-стрим с прежней логической сессией. - **Конфликт сессии:** при завершении gRPC-вызова с кодом **`ALREADY_EXISTS`** клиент **сбрасывает** `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы следующий коннект не повторял конфликтующий идентификатор и не слал устаревшее proof (см. `OnGrpcCallFinished` в `client_impl.cpp`). --- ### 1.5. Совместимость клиентов и серверов | Клиент | Сервер | Результат | |--------|--------|-----------| | `MaxAcceptProtocolVersion = 0` | любой | Поле `accept_protocol_version` не отправляется; ожидается legacy-ответ (`protocol_version` 0 / отсутствует). Биндинг по токену не используется. | | `MaxAcceptProtocolVersion > 0` | только legacy | Обычно `protocol_version` в ответе 0 или отсутствует — клиент остаётся в legacy для этой сессии. | | `MaxAcceptProtocolVersion > 0` | поддерживает v1+ | Согласуется конкретное число (например 1); включаются `session_binding_token` / `session_binding_proof` на реконнектах. | | Новый клиент | старый агент | Безопасный откат: нет обязательных новых полей в wire-format для legacy; сервер игнорирует неизвестные optional-поля (proto3). | Важно: поведение «только новый протокол» для отдельных механизмов (например принудительная отмена стрима по неактивности) в клиенте завязано на **фактически согласованную** версию **`NegotiatedProtocol > 0`**, а не только на настройку `MaxAcceptProtocolVersion`. --- ## 2. Изменения в клиенте — исправления и защита от регрессий Ниже — логика, связанная с новым протоколом и устойчивостью сессий (файл `client_impl.cpp`, заголовки `client.h` / `client_impl.h`). 1. **Согласование версии и биндинг** — `PrepareInitializeRequest` выставляет `accept_protocol_version` только при `MaxAcceptProtocolVersion > 0`; при наличии согласованной версии и токена добавляет `session_binding_proof`. `OnGrpcCallInitialized` выставляет `NegotiatedProtocol` и `SessionBindingToken` только если `protocol_version` задан и **> 0**, иначе очищает их (строгий legacy). 2. **Конфликт `ALREADY_EXISTS`** — при таком статусе завершения стрима сбрасываются `SessionId`, `NegotiatedProtocol` и `SessionBindingToken`, чтобы не зациклиться на неверной паре (session_id, proof) и не провоцировать повторные конфликты на стороне агента. 3. **Watchdog неактивности gRPC (`GrpcCallInactivityTimeout`)** — принудительное закрытие активного вызова (`BeginClose(true)`) и счётчик `GrpcCallsClosedByInactivity` выполняются **только** при `NegotiatedProtocol.Defined() && *NegotiatedProtocol > 0`. Для legacy и до первого успешного `Initialized` с ненулевой версией отмена по этому таймеру **не** выполняется; таймер перепланируется как раньше. Это устраняет нежелательное принудительное реконнект-поведение на транспортах, где ранее допускалось «молчание» без отмены (см. план по этому пути). 4. **Пост-fork дочерний процесс** — сброс `SessionId`, `NegotiatedProtocol`, `SessionBindingToken` вместе с очередями, чтобы дочерний процесс не унаследовал привязку чужой сессии. Документация в публичном API: комментарий к `SetGrpcCallInactivityTimeout` в `client.h` описывает ограничение по согласованной версии протокола. --- *При необходимости уточнения формулы `min(accept, server_max)` на стороне агента смотрите реализацию сервера в репозитории `logbroker/unified_agent` (обработка `Initialize` в gRPC-сессии).* commit_hash:9d5ef1cdc0faf793b4f56bfd2bafa362d7995ac5
* C++ client reconnect after agent SIGKILL/long outageandybg2026-04-105-0/+83
| | | | | | | | | | | | | | | | | | | | | | | | ## Проблема После аварийного завершения агента (SIGKILL) или длительной недоступности C++ клиент Unified Agent не восстанавливал соединение сам. Сессия оставалась в состоянии «активна», сообщения накапливались в буфере (inflight), при достижении лимита начинались потери (dropped messages), и доставка не возобновлялась даже после поднятия агента — требовался перезапуск приложения-клиента. ## Как исправлено Добавлен механизм «страховки»: если у сессии есть не подтверждённые сообщения (inflight), но по активному gRPC-стриму долго нет никакой активности (ни read, ни write, ни finish), клиент принудительно отменяет текущий вызов и запускает обычный путь реконнекта (повторные попытки установки сессии). «Длительность молчания» задаётся новым параметром; по умолчанию механизм включён (30 с) — зависание сессии никому не нужно, отключить можно явно, выставив таймаут в 0. {% cut "Технические детали" %} **Корневая причина:** реконнект планировался только в `OnGrpcCallFinished`, то есть после завершения активного stream-вызова. При «зависшем» транспорте (transport stall после падения агента) gRPC мог не вызывать финальный callback, `ActiveGrpcCall` оставался занятым, и новый `MakeGrpcCall` не запускался. **Изменения в коде:** - В `TClientParameters` добавлен параметр `GrpcCallInactivityTimeout` (по умолчанию 30 с; 0 = отключено). - В сессии заведён периодический watchdog-таймер; при срабатывании проверяется: есть ли активный `TGrpcCall`, есть ли inflight-сообщения и не было ли активности дольше заданного таймаута. При выполнении условий вызывается принудительное закрытие текущего call (`BeginClose(true)`), что приводит к `OnGrpcCallFinished` и планированию следующего `MakeGrpcCall` по `GrpcReconnectDelay`. - Время последней активности обновляется при любом успешном событии стрима (Accept, Read, Write, Finish, а также при инициализации сессии и при ack). **Тестирование:** добавлен интеграционный тест `TestReconnectAfterLongStall` (агент поднимается → сессия и первая доставка → убийство агента → отправка во время даунтайма → пауза → повторный запуск агента → проверка, что inflight обнуляется и сессия переинициализируется без рестарта приложения). Регрессии по существующим reconnect-тестам не наблюдаются. {% endcut %} commit_hash:f55efec2cff20fa975500e73d60ee57654abb2e0
* Drop backward compat with grpc-prevthegeorg2026-03-201-1/+1
| | | | commit_hash:e5545cade7cc946c943e85c680db7c276edc48b5
* Fix TAsyncJoiner "already joined" crashandybg2026-03-143-0/+32
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## 1. Где и когда это может происходить Гонка проявляется при **одновременном** завершении задачи (Unregister/Join) и выполнении finish-действия `CommitTimer()` в воркере. Типичные сценарии: - **Остановка агента** — при shutdown закрываются сессии, по задачам вызывается Unregister(); в это же время воркер может доходить до finish-действий уже «уходящей» задачи. - **Закрытие сессии/канала** — например, отключение клиента, ошибка — владелец вызывает Unregister() по задаче, в которой использовался таймер. - **Плагины и задачи, подверженные багу:** любые, что используют `TLocalTimersQueue` (отложенные таймеры через `DelayedExecutor`). В коде UA это, в частности: - **http_output** (`plugins/lib/http/http_sender.cpp`) — таймеры для отложенных повторов запросов и flush; - **file_input** (`plugins/file_input/file_input.cpp`) — таймеры при работе с файлами; - **logbroker_output_new** (`plugins/logbroker_output_new/logbroker_output_impl.h`) — таймер обновления метрик. Во всех этих случаях задача владеет `TLocalTimersQueue`, при установке таймера в finish-действие попадает `CommitTimer()`, и при быстром Unregister() возможна гонка с Join(). --- {% cut "Зачем потоку воркера вызывать Ref()" %} Таймер планируется **асинхронно**: `DelayedExecutor.SetTimer(Timer, triggerTime)` регистрирует callback в отдельном потоке (sleeper). Когда время наступит, callback вызовется **уже после** того, как `Run()` задачи завершился. Задача и её контекст (сессия, `TLocalTimersQueue`, `ExecutionJoiner`) должны оставаться валидными до срабатывания или отмены таймера — иначе callback приведёт к use-after-free. **Ref()** — это «удержание» задачи: пока есть лишний Ref, `Join()` не завершится (Refs не станет 0). То есть: «задача не считается полностью завершённой, пока таймер не сработал или не был сброшен». Когда таймер сбрасывают или очередь финализируют, вызывается **UnRef()** — тогда задача может перейти в joined. **Где именно вызывается Ref/UnRef:** Файл `logbroker/unified_agent/common/delayed_executor.cpp`: ```cpp void TLocalTimersQueue::CommitTimer() { if (Queue.GetCount() > 0) { const auto triggerTime = Top().Value(); if (!TimerTriggerTime.Defined() || ...) { if (!TimerTriggerTime.Defined()) { // ← ЗДЕСЬ: перед первой установкой таймера в DelayedExecutor // держим задачу «живой» до срабатывания/сброса таймера if (!TTaskExecutor::CurrentTaskOrDie().ExecutionJoiner().TryRef()) { CommitTimerScheduled = false; return; } } DelayedExecutor.SetTimer(Timer, triggerTime); // асинхронный таймер TimerTriggerTime = triggerTime; } } else if (TimerTriggerTime.Defined()) { DelayedExecutor.ResetTimer(Timer); TTaskExecutor::CurrentTaskOrDie().ExecutionJoiner().UnRef(); // таймер снят — отдаём Ref TimerTriggerTime.Clear(); } ... } ``` `CommitTimer()` вызывается из **finish-действия** задачи (добавляется в `EnsureCommitTimerScheduled()` → `AddFinishAction([this]() { CommitTimer(); })`), т.е. выполняется в потоке воркера после выхода из `Run()`. Пример использования таймера из кода плагина — `plugins/lib/http/http_sender.cpp`: там `LocalTimerQueue.SetTimer(request->Timer, triggerTime)` планирует отложенный повтор запроса; callback при срабатывании постит событие в сессию. {% endcut %} --- {% cut "Контекст: участники и суть гонки" %} **Участники:** - **ExecutionJoiner** (`TAsyncJoiner`) — объект с атомарным счётчиком `Refs` (начальное значение 1). Пока `Refs >= 1`, задачу считают «активной». `Join()` вызывает `UnRef()`; когда `Refs` становится 0, вызывается `Promise.SetValue()` («joined»). - **Поток задачи (Task/Unregister)** — владелец задачи; вызывает `Unregister()` → `ExecutionJoiner_.Join()` → внутри один раз `UnRef()`. - **Поток таймера (Worker/Timer)** — воркер пула задач; выполняет finish-действия задачи. Одно из них — `CommitTimer()`, которое при первой установке таймера вызывало `ExecutionJoiner().Ref()`. **Гонка:** между моментом, когда поток задачи делает `Join()` (и доводит `Refs` до 0), и моментом, когда поток воркера выполняет `CommitTimer()` и вызывает `Ref()`. Если `Ref()` вызывается уже после перехода в «joined», `fetch_add(1)` возвращает 0 и срабатывает `Y_ABORT_UNLESS(result >= 1, "already joined")`. {% endcut %} --- {% cut "До фикса: креш при гонке" %} Поток задачи вызывает `Unregister()` и ждёт `Join()`. Поток воркера после завершения `Run()` выполняет finish-действие `CommitTimer()`. Если к этому моменту `Join()` уже выполнил `UnRef()` и `Refs == 0`, вызов `Ref()` в `CommitTimer()` приводит к падению. ```mermaid sequenceDiagram participant TaskThread as Поток задачи participant Joiner as ExecutionJoiner participant WorkerThread as Поток воркера Note over TaskThread,WorkerThread: Задача с таймером: Run() вызвал SetTimer(), в finish-действия добавлен CommitTimer() TaskThread->>TaskThread: Unregister() TaskThread->>Joiner: Join() Joiner->>Joiner: UnRef() → Refs = 0 Joiner->>Joiner: Promise.SetValue() — joined WorkerThread->>WorkerThread: Выполняет finish-действия WorkerThread->>WorkerThread: CommitTimer() WorkerThread->>Joiner: Ref() Joiner->>Joiner: fetch_add(1) → result = 0 Joiner->>WorkerThread: Y_ABORT_UNLESS(result >= 1) — CRASH ``` **Итог:** в момент вызова `Ref()` в `CommitTimer()` объект уже в состоянии «joined» (`Refs == 0`), проверка в `Ref()` не выполняется → **SIGABRT**. {% endcut %} --- {% cut "После фикса: корректный выход без креша" %} В `CommitTimer()` вместо `Ref()` вызывается `TryRef()`: атомарно проверяется `Refs >= 1` (через CAS); если уже 0, `TryRef()` возвращает `false` и `CommitTimer()` сразу выходит, не вызывая `Ref()` и не трогая таймер. ```mermaid sequenceDiagram participant TaskThread as Поток задачи participant Joiner as ExecutionJoiner participant WorkerThread as Поток воркера Note over TaskThread,WorkerThread: Та же гонка: Join() и CommitTimer() выполняются почти одновременно TaskThread->>TaskThread: Unregister() TaskThread->>Joiner: Join() Joiner->>Joiner: UnRef() → Refs = 0 Joiner->>Joiner: Promise.SetValue() — joined WorkerThread->>WorkerThread: Выполняет finish-действия WorkerThread->>WorkerThread: CommitTimer() WorkerThread->>Joiner: TryRef() Joiner->>Joiner: load(Refs) = 0 → current < 1 Joiner->>WorkerThread: return false WorkerThread->>WorkerThread: CommitTimerScheduled = false return Note over WorkerThread: Таймер не ставится, креша нет ``` **Итог:** при уже «joined» состоянии `TryRef()` возвращает `false`, `CommitTimer()` завершается без вызова `Ref()` и без падения. {% endcut %} --- {% cut "Сводка изменений" %} | Место | До фикса | После фикса | |-------|----------|-------------| | `CommitTimer()` при первой установке таймера | `Ref()` → при Refs=0 креш | `TryRef()` → при `false` ранний выход | | `TAsyncJoiner` | Только `Ref()` / `UnRef()` | Добавлен `TryRef()` (CAS, при refs<1 возврат false) | | `Finalize()` | Не сбрасывал активный таймер | При `TimerTriggerTime.Defined()` — `ResetTimer`, `UnRef()`, `Clear()` до `Finalized = true` | Тест `TestTimerQueueUnregisterNoCrash` (500 итераций: задача с таймером → Pulse → Unregister) без фикса периодически воспроизводит креш; с фиксом — стабильно зелёный. {% endcut %} commit_hash:5f57d88fc53f44db31e87deaeca57a7e9ef262ca
* /0: Implement TExecutorSet and TExecutorSetManageriofik2026-02-252-3/+3
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ## Абстракция управления executor’ами (TExecutorSet, TExecutorSetManager) ### Что сделано Вводится единый слой для работы с пулами потоков agent’а: main, background и IO. **TExecutorSet** — представление набора executor’ов: - `Main()`, `Background()`, `IO()` — одинаковый интерфейс для всех трёх пулов - IO возвращает тот же объект, что и Main, когда `separate_io_thread_pool == 0`, иначе — отдельный executor **TExecutorSetManager** — создание и владение executor’ами: - `GetExecutors(EExecutorPool pool)` — возвращает нужный набор; пока используются только `Global` и `Metrics` (оба отдают один и тот же набор) - `Start()`, `Stop()` — жизненный цикл - Main создаётся сразу, Background и IO — лениво при первом обращении **Рефакторинг TAgent**: - Вместо отдельных полей `TaskExecutor`, `BackgroundTaskExecutor`, `IOTaskExecutor` и `DelayedExecutor` используется `ExecutorSetManager` - Обращения к executor’ам идут через `ExecutorSetManager->GetExecutors(EExecutorPool::Global)` - FSNotifier, MemoryQuotaManager, AgentContext, CountersUpdater работают с глобальными executor’ами **RegisterCountersWith**: - Регистрирует счётчики main, background, IO (если включён отдельный IO-пул) и delayed - Раньше IO-счётчики не учитывались — это исправлено **Тесты**: - Добавлен gtest-модуль `executor_set_tests` в `tests/gtests/`: - Проверяются GetExecutors, доступность Main, ленивая инициализация Background - Поведение IO при `separate_io_thread_pool == 0` и `> 0` - Start/Stop и параметр pool в GetExecutors ### Цель Подготовка к PR1: выделение отдельных пулов для pipelines. Сейчас меняется только абстракция, без изменения конфигурации и без добавления dedicated pools. commit_hash:0f1b2205c474db1b601a72f063916333a74882d6
* /3: Get rid of YLOG*_F macros in favor of just YLOG*iofik2026-02-201-1/+1
| | | | | _Все вызовы вида YLOG\*\_F(...) заменены на YLOG\*_(...) по всему дереву unified\_agent (например, YLOG\_DEBUG\_F → YLOG\_DEBUG, YLOG\_ERR\_F → YLOG\_ERROR). commit_hash:483c6f5cb6db2d44e9d71b427a697dc2850cba1f
* /2: Use our internal logger header, add logging metricsiofik2026-02-196-87/+388
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | # Улучшения библиотеки логирования и переход на троттлинг логов ## Описание Этот PR содержит улучшения системы логирования unified\_agent с акцентом на предотвращение флуда логов и добавление метрик логирования. ## Основные изменения ### 1. Улучшения библиотеки логирования (`library/cpp/unified_agent_client/logger`) - **Встроенный троттлинг логов**: Добавлена поддержка ограничения частоты логирования на уровне библиотеки - Новые макросы `YLOG_*_T` с автоматическим троттлингом (20 логов на 10 секунд по умолчанию) - Независимый троттлинг для каждой точки логирования (по `__FILE__:__LINE__`) - Автоматический подсчет подавленных сообщений с выводом `[+N suppressed]` - **Метрики логирования**: Добавлены счетчики для мониторинга активности логирования - `RecordsReceived` - общее количество попыток логирования - `RecordsDropped` - количество подавленных сообщений из-за троттлинга - Счетчики передаются через `TLogger::TCounters` при создании логгера - **Оптимизация производительности**: - Использование `GetCycleCount()` для быстрого получения времени (вместо системных вызовов) - Relaxed memory ordering для атомарных операций (достаточно для троттлинга) - Минимальные накладные расходы при отключенном логировании ### 2. Переход всех логов агента на троттлинг - **Унификация макросов**: Все макросы `YLOG_*` в `logbroker/unified_agent/common/util/logger.h` теперь используют троттлинг - `YLOG_DEBUG`, `YLOG_INFO`, `YLOG_WARNING`, `YLOG_ERROR` и т.д. теперь автоматически применяют троттлинг - Старые макросы `YLOG_*_F` теперь алиасы для новых троттлированных версий - Обратная совместимость полностью сохранена - **Обновление документации**: Файл `for_ai_cpp.md` обновлен с новыми рекомендациями по логированию ### 3. Интеграция метрик логирования в телеметрию - **Новые счетчики в `TAgentLogCounters`**: - `RecordsReceived` - rate метрика `agent.log.records_received` - `RecordsDropped` - rate метрика `agent.log.records_dropped` - **Экспорт метрик**: Метрики логирования автоматически собираются и отправляются в телеметрию агента - **Рефакторинг конструктора `TAgent`**: - Упрощена передача счетчиков через структуру `TAgent::TCounters` - Счетчики логирования передаются в `TLogger` при инициализации ### 4. Тесты - **Перенос тестов**: Тесты логирования перемещены из `logbroker/unified_agent/tests/gtests/logger_tests` в `library/cpp/unified_agent_client/ut` - **Расширенное покрытие**: - Тесты базового троттлинга - Тесты счетчика подавленных сообщений - Тесты независимого троттлинга для разных точек логирования - Тесты форматирования сообщений - Тесты счетчиков метрик ### 5. Исправления и улучшения - **Удаление дублирования кода**: Логика троттлинга теперь находится только в `library/cpp/unified_agent_client/logger.cpp` - **Упрощение API**: Удален отдельный файл `logbroker/unified_agent/common/util/logger.cpp` - **Обновление импортов**: Все файлы обновлены для использования правильных заголовочных файлов ## Преимущества 1. **Защита от флуда логов**: Автоматическое ограничение частоты логирования предотвращает переполнение логов 2. **Наблюдаемость**: Метрики логирования позволяют отслеживать активность и проблемы с логированием 3. **Производительность**: Минимальные накладные расходы благодаря оптимизированной реализации 4. **Простота использования**: Троттлинг работает автоматически, не требует изменений в коде 5. **Обратная совместимость**: Все существующие макросы продолжают работать ## Тестирование - ✅ Все unit-тесты логирования проходят - ✅ Integration тесты обновлены (исключение нестабильной метрики `RecordsReceived` из сравнений) - ✅ Проверена работа троттлинга в реальных условиях commit_hash:75fc97a8576114446bfb9ec11efbb80df322e443
* /0: Implement logging with throttling, replace YLOG macro with a new oneiofik2026-02-151-1/+9
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | ### Добавлен троттлинг логов для предотвращения флуда **Основная цель:** Защита от переполнения логов повторяющимися сообщениями. #### Что добавлено: 1. **Механизм троттлинга логов** ([`common/util/logger.h`](common/util/logger.h), [`common/util/logger.cpp`](common/util/logger.cpp)) - Ограничение: по умолчанию максимум 20 сообщений за 10 секунд для каждой точки логирования - Независимый троттлинг для каждого места вызова `YLOG` (через `static` переменную состояния) - Подсчёт пропущенных сообщений с выводом в формате `[+N suppressed]` - Минимальные накладные расходы: использует `GetCycleCount()` вместо системных вызовов времени - Атомарные операции для потокобезопасности 2. **Настраиваемый троттлинг** - Макрос `YLOG_THROTTLED(logger, priority, maxLogs, intervalSec, ...)` для кастомных лимитов - Макросы `YLOG_THROTTLE_MAX_LOGS` и `YLOG_THROTTLE_INTERVAL_SECS` для глобальной настройки (например, в тестах) 3. **Поддержка C++20 `std::format`** - Макросы `YLOG_*` теперь поддерживают форматирование в стиле C++20: `YLOG_ERR("value: {}", x)` - Обратная совместимость с `YLOG_*_F` макросами сохранена - Специальная обработка `TStringBuilder` для избежания проблем с временными объектами 4. **Тесты** ([`tests/gtests/logger_tests/`](tests/gtests/logger_tests/)) - Проверка базового троттлинга - Проверка счётчика пропущенных сообщений - Проверка независимости троттлинга для разных точек логирования - Проверка форматирования сообщений #### Технические изменения: **Изменён порядок аргументов в макросе `YLOG`:** - Было: `YLOG(priority, message, logger)` - Стало: `YLOG(logger, priority, message, ...)` Это изменение потребовало обновления всех вызовов `YLOG` в кодовой базе (~30 файлов), но обеспечивает: - Единообразие с другими логирующими библиотеками - Возможность использования variadic arguments для форматирования - Правильную работу с `const auto&` для продления времени жизни временных объектов #### Примеры использования: ```cpp // Базовое использование (троттлинг 20 логов/10 сек) YLOG_ERR("Error occurred"); YLOG_INFO("Processing item: {}", itemId); ``` #### Влияние на производительность: - Минимальные накладные расходы: ~10-20 наносекунд на вызов (использование CPU cycles вместо системного времени) - Атомарные операции только при необходимости логирования - Отсутствие дополнительных (по сравнению с текущим кодом) аллокаций памяти в hot path #### Обратная совместимость: - Все существующие `YLOG_*` и `YLOG_*_F` макросы работают как раньше - Добавлен троттлинг по умолчанию для всех логов (может потребовать внимания при отладке) commit_hash:d6e98b82486b99b945a1ba264fb2da969db4fcfc
* Add throttling for max inflight error logsandybg2026-01-153-2/+70
| | | | | | | | | | | | | | ## Problem Log spam with repeated ``'max inflight of [***] bytes reached, [***] bytes dropped'`` errors appearing multiple times within the same second, causing log flooding. ## Solution Implemented one-second throttling mechanism for this specific error message to prevent log spam while maintaining accurate counter tracking. ## Testing - Added `TestMaxInflightBytesThrottling` integration test - Confirms throttling limits log entries to ≤3 for 50 rapid message drops (vs 50 without throttling) - Ensures all dropped message counters remain accurate regardless of throttling commit_hash:58f44ca8ce2b8b416586f8ca7a3d3ca971f1e9cb
* fix: Migrate to YLOG_*_F and eliminate dangerous c_str() patternsandybg2025-11-291-0/+12
| | | | | | | | | | | | | | | ## Summary Eliminated all dangerous c_str() patterns from unified_agent by migrating to modern YLOG_*_F formatting. ## Changes - Remove .c_str() on temporaries in YLOG_*_F macros (1 fix) - Migrate YLOG_*(Sprintf(...)) to modern YLOG_*_F(...) (23 fixes) ## Benefits - Modern C++20 type-safe formatting with std::format - Cleaner, more maintainable code (40% less in some cases) - Better performance (no Sprintf overhead) commit_hash:ee3f665ccc823d9e2165a90c036e5bd887cd179e
* - Global Storage Limit Featureandybg2025-11-282-0/+30
| | | | | | | See full description in separate comment. The open source part is to add implementation for formatter specialization for TString. commit_hash:9501df4a7287050b72162b80823d5cbbd1d7464f
* Create new plugin agent_logs_inputhobbit2025-11-252-0/+14
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | **Summary** Реализован новый входной плагин `agent_logs` для отправки логов самого агента в пайплайн unified\_agent. Это решает проблему доступа к логам агента в production окружении, где требовались права System developer для доступа к файлам на дисках. ## **What's Changed** {#whats-changed} ### **New Plugin: `agent_logs_input`** {#new-plugin-agent_logs_input} Добавлен новый входной плагин, который: * Перехватывает логи агента через кастомный `TLogBackend` * Отправляет логи в пайплайн стриммингово (не пулингом) * Поддерживает фильтрацию по уровню логов (настройка **level**, по умолчанию `NOTICE`) ### **Implementation Details** {#implementation-details} **Основные компоненты:** 1. **TAgentLogsBackend** - кастомный log backend: * Наследуется от `TLogBackend` * Перехватывает все логи агента через метод `WriteData()` * Фильтрует логи по приоритету * Отправляет отфильтрованные логи в `IMessageConsumer` * Поддерживает graceful stop 2. **TPlugin** - основной класс плагина: * Управляет lifecycle плагина * Создает сессию с ID `agent_logs` * Передает конфигурацию в session handler 3. **TSessionHandler** - обработчик сессии: * Создает и регистрирует `TAgentLogsBackend` * Добавляет backend в глобальный logger через `AddLog()` * Управляет остановкой backend'а **Пример конфигурации:** ``` routes: - input: plugin: agent_logs config: level: INFO channel: pipe: - filter: plugin: format_otel_logs output: # https://nda.ya.ru/t/YexxO8Qf7NZMzy plugin: otel_logs config: url: "collector.logs.yandex-team.ru:443" project: hobbit_test_project service: logs # https://nda.ya.ru/t/o4MQT5rI7NZMzz oauth: secret: env: TEST_LOGS_OAUTH agent_log: priority: INFO rate_limit_bytes: 100000 ``` commit_hash:c6f115ab98c30a47845ce9f9f8a58ef89973d422
* Fix gRPC retry parser initialization race conditionandybg2025-11-231-2/+11
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | Проблема: Сегментация через ~42 секунды после запуска UA в ServiceConfigParser::GetParserIndex("retry"). Причина: Static Initialization Order Fiasco + Lazy Initialization в gRPC CoreConfiguration. RetryFilter пытается получить парсер до того, как CoreConfiguration зарегистрировала его. Решение: 1. Устанавливаем лимиты потоков ДО grpc_init() (статические параметры) 2. Явно вызываем grpc_init() для гарантии выполнения всех статических инициализаторов 3. Принудительно инициализируем CoreConfiguration через CoreConfiguration::Get() Это гарантирует порядок: - Настройка лимитов потоков - Инициализация библиотеки gRPC - Построение CoreConfiguration с регистрацией всех парсеров - Только после этого фильтры могут безопасно использовать retry парсер Изменения: - library/cpp/unified_agent_client/grpc_io.cpp: Исправлена EnsureGrpcConfigured() - tests/ut/grpc_init_ut.cpp: Добавлены unit тесты - tests/grpc_init_check/: Standalone тестовая программа - tests/ut/ya.make: Интеграция тестов в систему сборки Все тесты прошли: 640/640 OK commit_hash:ed4601dfe21f6dfac653dec6e9c3e535e5a0a09c
* /1: Специализированный Multishard Metrics Store для ↵iofik2025-10-021-5/+5
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | быстрой отдачи метрик в Multishard Metrics Pull ## С точки зрения пользователя ### 1. Как включать изменения - Изменения автоматически активируются при использовании `metrics_pull_output_par` параметра - Количество тредов в тредпуле устанавливается параметром `metrics_pull_output_threads` (по-умолчанию 1) - Совместимо с существующими конфигурациями метрик #### Пример конфигурации: ```yaml experimental: metrics_pull_output_par: true metrics_pull_output_threads: 16 ``` ### 2. На что влияют изменения - **Производительность обработки метрик:** Группировка метрик по шардам теперь происходит один раз в момент записи сообщения в хранилище - **Масштабируемость:** Параллельная обработка команд на запись и чтение сообщений ## С точки зрения программиста ### 1. Новая архитектура и взаимодействие #### Архитектура шардирования ``` TMultishardMetricsStoreHandler ↓ TShardedMetricsQueue ↓ TUAShardedMetricsWriter → IMultiShardEncoder ``` #### Принцип работы 1. **Шардирование:** Метрики группируются по ключу `(project, service, cluster)` из лейблов `__ua_project`, `__ua_service`, `__ua_cluster` 2. **Сортировка:** Внутри каждого сообщения метрики сортируются по ключам шардов 3. **Слияние:** При чтении используется priority queue для объединения шардов из разных сообщений 4. **Вывод:** Данные выводятся последовательно по шардам а внутри шарда в порядке добавления сообщений ### 2. Новые классы #### `TMultishardMetricsStoreHandler` - **Назначение:** Основной обработчик команд для шардированных метрик - **Ключевые методы:** - `Handle(TWriteCmd&)` - запись метрик в очередь - `Handle(TMultishardReadCmd&)` - чтение из множественных шардов - `Handle(TReadCmd&)` - чтение из одного шарда - не поддерживатся - Обработчики остальных сообщений адаптированы для работы с `TShardedMetricsQueue` - **Особенности:** Управляет сессиями и делегирует работу с данными в `TShardedMetricsQueue` #### `TShardedMetricsQueue` - **Назначение:** Очередь метрик с поддержкой шардирования - **Ключевые структуры:** ```cpp struct TShardedMessage { std::unique_ptr<TOutputMessage> Message; std::vector<TChunk> Chunks; // Индексы шардов }; ``` - **Алгоритм:** Сортировка метрик по шардам + priority queue для слияния при чтении #### `IUAShardedMetricsWriter` + `TUAShardedMetricsWriter` - **Назначение:** Интерфейс и реализация для записи шардированных метрик - **Расширение:** Добавляет методы `OnShardBegin/End`, `OnStreamBegin/End` к базовому `IUAMetricsWriter` - **Интеграция:** Работает с `NSolomon::NMultiShard::IMultiShardEncoder` ### 3. Рефакторинг #### Изменения в базовых классах - **`TMetricsStoreHandlerBase`:** Вынесен метод `GetMetricsWriterOptions()` для переиспользования - **`TUAMetricsWriter`:** Добавлено `virtual` наследование для множественного наследования !!! - **`TOutputItem`:** Изменена видимость конструктора (убран лишний `private`) #### Структурные изменения - **Новый модуль:** `metrics_multishard_pull_output` с собственными классами - **Тесты:** Добавлены unit-тесты `sharded_metrics_queue_ut.cpp` для проверки логики шардирования, а интеграционные тесты продублированы для проверки работы в параллельном режиме - **Сборка:** Обновлен `ya.make` для включения новых файлов commit_hash:44618a480d29307e7131b82b19473d284ea68efa
* Added std::format include into unified agent loggerkirill-bel2025-09-171-0/+1
| | | | | | | | | | | | | | | | | | | Разломались на вызове YLOG\_DEBUG\_F с ошибкой: ``` /home/kirill-bel/arcadia/logbroker/unified_agent/common/zstd_file_stream.cpp:39:17: error: no member named 'format' in namespace 'std' 39 | YLOG_DEBUG_F("TZstdFileStream starting from frame pos = {} frame offset = {}", | ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 40 | lastFramePos, lastFrameOffset); | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``` В YLOG\_DEBUG\_F используется `std::format`, но хедер не включен в `library/cpp/unified_agent_client/logger.h`: ``` #define YLOG_DEBUG_F(fmt, ...) YLOG_DEBUG(std::format(fmt, __VA_ARGS__)) ``` commit_hash:0a82f439ec6b8d83e2f4ac1a7d8a8c8cf29b9cc2
* Added macros for std::format log and changed existing ↵deruzh2025-05-281-0/+11
| | | | | | | YLOG_<Type>(std::format(...)) to them Мы часто используем std::format, готовясь к переходу с TString на std::string, но из-за этого появляется много лишнего кода. Поэтому были добавлены макросы логгера для удобства и существующий код был переведен на них commit_hash:e564c913e0dc82817152b7a15c543a77812a1dcc
* Extra copynae2022025-02-251-4/+4
| | | | | Убираю лишнее копирование. Актуально для REVIEW:7901279 commit_hash:79f0f8873f80375269dd5384e189ac4f3b260017
* Add UnifiedAgentWriterFactorylo-r-d2025-01-203-0/+108
| | | | commit_hash:af6dedadd4d7fe292bcb7a8b6de366aff4e630b1
* Limit the number of gRPC threadsandybg2024-12-101-0/+2
| | | | | | | Add test to verify the threads limit Try to fix; Limit number of gRPC threads. commit_hash:d525e94e0ef397bf05d367ec9904d674c17d0d76
* Use lower case for TString methods Data, Size and Empty in extsearch, ↵mikhnenko2024-10-105-8/+8
| | | | | | | fintech, games, geobase, infra, ipreg, juggler, kernel, keyboard, laas, library, logbroker, logos, mail If you think that this pr has broken something for you, roll it back commit_hash:df8e48b2a4ecbbc74a504aa3ff62ebb8f464218d
* logbroker out: use CloseHandleraleksei-le2024-08-181-0/+7
| | | | | Всегда ожидает Close event от СДК, но проверяет таймаут ожидания. Проверка активности соединения, по таймауту сделает реконект. Немного снижено потребление cpu. 4c41128e833c27cea3d1eb0a90c086d1a328f282
* Modernize superseded grpc includesthegeorg2024-04-121-1/+1
| | | | | | `include/grpc++` was the original directory name for all C++ header files but it conflicted with the naming scheme required for some build systems. It is superseded by `include/grpcpp`. ede5ac168419131cfa95db8f7d3cb0bf11597992
* feat contrib: aiogram 3armenqa2024-01-1912-734/+0
| | | | Relates: https://st.yandex-team.ru/, https://st.yandex-team.ru/
* Library import 8 (#1074)AlexSm2024-01-181-2/+2
| | | | | * Library import 8 * Add contrib/libs/cxxsupp/libcxx/include/__verbose_abort
* Library import 7 (#937)AlexSm2024-01-111-1/+1
|
* External build system generator release 65robot-ya-builder2023-12-052-6/+6
| | | | Update tools: yexport, os-yexport
* add darwin-arm64 CMakeListsdcherednik2023-11-204-0/+142
|
* Y_FAIL->Y_ABORT at '^li'ilnurkh2023-10-171-1/+1
| | | | https://clubs.at.yandex-team.ru/arcadia/29404
* Y_VERIFY->Y_ABORT_UNLESS at ^lilnurkh2023-10-096-33/+33
| | | | https://clubs.at.yandex-team.ru/arcadia/29404
* Fix input variable missprintsvidyuk2023-08-308-0/+136
|
* All .ll files support in LLVM_BCsvidyuk2023-08-308-136/+0
|
* sample_by_rate filter: add volume limit on data stream in filtergitnab2023-08-291-0/+2
|
* ydb: support go code in OSSuzhas2023-08-241-1/+1
|
* unified-agent: add log for SeqNoaleksei-le2023-08-201-1/+1
|
* feat grpc: update to grpc 1.53.1leonidlazarev2023-07-172-2/+2
| | | | | | | | | | | | | | update grpc to 1.53.1 update grpcio/py3 to 1.53.1 Added patches: 22-grpc-code-output.patch - allow translation of grpc code to internal string type. 23-max-thread-limitation.patch - to provide interface for settings of thread number limit, as grpc::DynamicThreadPool doesn't provide interface to limit thread number anymore. 24-support_for-non-abort-grpc.patch - generate exception instead of application crash 25-forkable-destruction-order.patch - correct forkable logic for TimerManager 27-skip-child-post-fork-operations.patch - allow to skip child post fork operations to exclude UB (used for unified agent only) pr33495_fox_nested_fork.patch - fix issues with nested forks pr33582_fork_handler.patch - disable fork handler support if it is not requested intentionally
* unified-agent - fix static initialization order problemgrebelnik2023-07-131-5/+8
| | | unified-agent - fix static initialization order problem
* client-cpp for unified-agent: changing log levelaleksei-le2023-07-031-18/+18
|
* [unified agent] fix printf specifiers for 32 bit arch /gluk472023-06-261-13/+13
|
* add ymake export to ydbalexv-smirnov2023-06-134-0/+66
|
* feat grpc: update to grpc 1.50.2leonidlazarev2023-06-021-1/+2
| | | | | | | | | | | | | | | update grpc to 1.50.2 update grpcio to 1.50.0 Удаленные патчи: 06-flow_control.patch - логика в upstream удалена 10-fix-crash-on-fork.patch - логика в upstream удалена 12-coverity-fix.patch - логика в upstream удалена 20-P2166-string-nullptr.patch - в upstream временный объект вместо nullptr PR29209-fix-heap-use-after-free.patch - решение есть в upstream Добавленные патчи: pr33085_fix_epoll1_engine_reinit.patch 21-windows_build.patch
* feat grpc: correct order of the fork variable setleonidlazarev2023-05-221-1/+10
| | | Correct order of the GRPC_ENABLE_FORK_SUPPORT set
* External build system generator release 29robot-ya-builder2023-04-192-2/+2
| | | Update tools: yexport
* Revert ymake build from ydb oss exportalexv-smirnov2023-03-284-66/+0
|
* External build system generator release 21robot-ya-builder2023-03-172-6/+6
| | | Update tools: yexport
* add library/cpp/actors, ymake build to ydb oss exportalexv-smirnov2023-03-154-0/+66
|
* Intermediate changesrobot-piglet2023-03-094-0/+142
|
* Intermediate changesrobot-piglet2023-03-076-4/+4
|
* External build system generator release 17robot-ya-builder2023-03-068-2/+92
| | | Update tools: yexport
* separate counters for each filevalerybitsoev2023-02-281-0/+24
|