| Commit message (Collapse) | Author | Age | Files | Lines |
| ... | |
| |
|
|
| |
commit_hash:24a87198c8fc53da983f9678679b990070e329b8
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
## Проблема
После аварийного завершения агента (SIGKILL) или длительной недоступности C++ клиент Unified Agent не восстанавливал соединение сам. Сессия оставалась в состоянии «активна», сообщения накапливались в буфере (inflight), при достижении лимита начинались потери (dropped messages), и доставка не возобновлялась даже после поднятия агента — требовался перезапуск приложения-клиента.
## Как исправлено
Добавлен механизм «страховки»: если у сессии есть не подтверждённые сообщения (inflight), но по активному gRPC-стриму долго нет никакой активности (ни read, ни write, ни finish), клиент принудительно отменяет текущий вызов и запускает обычный путь реконнекта (повторные попытки установки сессии). «Длительность молчания» задаётся новым параметром; по умолчанию механизм включён (30 с) — зависание сессии никому не нужно, отключить можно явно, выставив таймаут в 0.
{% cut "Технические детали" %}
**Корневая причина:** реконнект планировался только в `OnGrpcCallFinished`, то есть после завершения активного stream-вызова. При «зависшем» транспорте (transport stall после падения агента) gRPC мог не вызывать финальный callback, `ActiveGrpcCall` оставался занятым, и новый `MakeGrpcCall` не запускался.
**Изменения в коде:**
- В `TClientParameters` добавлен параметр `GrpcCallInactivityTimeout` (по умолчанию 30 с; 0 = отключено).
- В сессии заведён периодический watchdog-таймер; при срабатывании проверяется: есть ли активный `TGrpcCall`, есть ли inflight-сообщения и не было ли активности дольше заданного таймаута. При выполнении условий вызывается принудительное закрытие текущего call (`BeginClose(true)`), что приводит к `OnGrpcCallFinished` и планированию следующего `MakeGrpcCall` по `GrpcReconnectDelay`.
- Время последней активности обновляется при любом успешном событии стрима (Accept, Read, Write, Finish, а также при инициализации сессии и при ack).
**Тестирование:** добавлен интеграционный тест `TestReconnectAfterLongStall` (агент поднимается → сессия и первая доставка → убийство агента → отправка во время даунтайма → пауза → повторный запуск агента → проверка, что inflight обнуляется и сессия переинициализируется без рестарта приложения). Регрессии по существующим reconnect-тестам не наблюдаются.
{% endcut %}
commit_hash:f55efec2cff20fa975500e73d60ee57654abb2e0
|
| |
|
|
| |
commit_hash:1f5f18fdf0f2495984459d943dd860d4ae2c6202
|
| |
|
|
| |
commit_hash:4821db4ea5b9e016f8c478cb6a7a1ea63b1bc326
|
| |
|
|
| |
commit_hash:eee25420ad6ea93339717787cd90187d2804df76
|
| |
|
|
| |
commit_hash:343a4d8f871af0716a506a7b4a806821b6960075
|
| |
|
|
| |
commit_hash:def1549af31025d7b902e3bf92206a03053f5a71
|
| |
|
|
| |
commit_hash:2c3caf7a2a8655649650230a87041de32d894601
|
| |
|
|
| |
commit_hash:b314e8a95a2e0b045610b036e73d627d36a862fb
|
| |
|
|
| |
commit_hash:91ce0e2023d29ef78fadeaded314ea236f316b8e
|
| |
|
|
| |
commit_hash:7bea862660ba289cd58b74fac07d57aa126aa1c1
|
| |
|
|
| |
commit_hash:515a085472bab89404ad6e51f91be61ac01d9094
|
| |
|
|
| |
commit_hash:02ffad2516143e0c872a0d52eda33561e23e3ec2
|
| |
|
|
| |
commit_hash:fec2286eb1fd675e7d50f7da7695f393a5b0a5aa
|
| |
|
|
| |
commit_hash:25c6545fed2bffe20f7a008a218b9245896926ec
|
| |
|
|
| |
commit_hash:70f4e2c519e189107c5e9dec6ba45eeb58973f46
|
| |
|
|
| |
commit_hash:7a04b71703a53450dbaa8ec880141545e019a71e
|
| |
|
|
| |
commit_hash:3ffe8eb8d830a76efbd1fad2e6ebda2b140215c7
|
| |
|
|
| |
commit_hash:6f0226afc878b77a48efdd142971b568aa0bf405
|
| |
|
|
| |
commit_hash:6b6fb1b60f32209f80d85e4e33889e192a35ac07
|
| |
|
|
| |
commit_hash:986e0f49b3f095829d1a8af7f2f88521055e63a1
|
| |
|
|
| |
commit_hash:2ba91e501cb0d1d5f727d7018af5bdc34ebd51f2
|
| |
|
|
| |
commit_hash:ab715a19c7c9819d6130970851b148010f9189bd
|
| |
|
|
| |
commit_hash:6f47ff10b7e4503466c226a6d3df0637a5c14715
|
| |
|
|
| |
commit_hash:2a9ac53b44252aeea5188e821743fc79b064947b
|
| |
|
|
| |
commit_hash:3b7f35f613737aa92f473877e8b24782f7dfcd2a
|
| |
|
|
| |
commit_hash:520dd74e9a7cc021d91d1f5a2b6db883b21188c5
|
| |
|
|
|
| |
Add same method as in TFastElasticQueue, and test.
commit_hash:0a2b618325e57c32fd269254a7dbe912849c3f10
|
| |
|
|
| |
commit_hash:c574736c9cbb7c6da6502dc751214d8d7f343568
|
| |
|
|
|
|
| |
Y_NO_SANITIZE("address")
commit_hash:30841b1871a64fd6b3cc1eebcc9e4d5f1281c4fa
|
| |
|
|
| |
commit_hash:45c2f193373485b196c00cf6cec6faf32822bb4a
|
| |
|
|
| |
commit_hash:1acc39366e0b54658fb2847e848a9bc3b7a8b8a6
|
| |
|
|
| |
commit_hash:5d980b19ed177f3a4ce03ba7c7d89ab9d711b8e8
|
| |
|
|
| |
commit_hash:e5545cade7cc946c943e85c680db7c276edc48b5
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
### Описание
В данном PR реализован механизм базовой фильтрации метрик в Unified Agent. Цель — по умолчанию исключать некоторые "тяжелые" или избыточные метрики для снижения нагрузки, оставляя при этом возможность запросить полный набор.
**Основные изменения:**
* **Новый пресет фильтрации:** В `ESetParameter` добавлен режим `basic`. В отличие от `minimal` и `advanced`, работающих по принципу "белого списка" (allowlist), режим `basic` работает как "черный список" (blocklist) и исключает только заданные метрики.
* **Черный список по умолчанию:** Реализована функция `GetDefaultBlockedMetricNames()`. На данный момент в этот список добавлена метрика `UnacknowledgedBytes`.
* **Логика фильтрации:** Обновлен класс `TCountersFilter`. Теперь методы `GetAllowedNames` и `GetBlockedNames` используют единый `switch-case` по значениям `ESetParameter`. Если параметр `@set` пуст или не распознан, по умолчанию применяется поведение пресета `basic`. Пресет `all` корректно отключает любую фильтрацию.
* **Изменение дефолтного поведения:** Для плагина `agent_metrics_input` значение по умолчанию в поле `Preset` изменено на `basic`. Теперь `TCountersFilter` инстанцируется и применяется всегда, гарантируя консистентную фильтрацию.
**Тестирование:**
* В конфигурацию `test_counters_set_parameter.yml` добавлен `fs` storage и пайплайн для отправки данных через него. Это было необходимо, чтобы агент начал генерировать метрику `UnacknowledgedBytes`, связанную с файловым хранилищем.
* В `generic_tests.cpp` значительно переработан `TestCountersSetParameter`. Чтобы избежать "мигания" (flakiness) тестов из-за меняющихся значений счетчиков или появления служебных логов (например, `RecordsReceived`), реализована кастомная функция `getSensorSchema`. Она извлекает и сравнивает только структуру метрик (`kind` и `labels`), игнорируя их числовые значения.
* Добавлены строгие ассерты: тест проверяет, что метрика `UnacknowledgedBytes` **отсутствует** при базовом запросе (или при `?@set=basic`), но **гарантированно присутствует** при явном запросе `?@set=all`.
commit_hash:78b757d4c55f496e0f041ddfba8f385784b46881
|
| |
|
|
|
| |
Странно, что такого метода не было. А он полезен, если хочешь хачить уже имеющуюся структуру.
commit_hash:09763c7c08d149a27867a4cf5908b694b3024e99
|
| |
|
|
| |
commit_hash:f7e9a90ba49fd848eaff6b094704bd3b2946c40f
|
| |
|
|
| |
commit_hash:d068d68a89226c414a3d5a1f8ad102579bdd233b
|
| |
|
|
| |
commit_hash:6962a6b28ec3ee5fdc2392a2415810aaa54943c4
|
| |
|
|
| |
commit_hash:6e15bd10891b4a28d44b0bf6ef5e72c33b047eb1
|
| |
|
|
| |
commit_hash:856b6638d9642f4711f5058d77bd58dd78accb8a
|
| |
|
|
| |
commit_hash:1deb4fa77014b102af1c79903f60641f23a50813
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
## 1. Где и когда это может происходить
Гонка проявляется при **одновременном** завершении задачи (Unregister/Join) и выполнении finish-действия `CommitTimer()` в воркере. Типичные сценарии:
- **Остановка агента** — при shutdown закрываются сессии, по задачам вызывается Unregister(); в это же время воркер может доходить до finish-действий уже «уходящей» задачи.
- **Закрытие сессии/канала** — например, отключение клиента, ошибка — владелец вызывает Unregister() по задаче, в которой использовался таймер.
- **Плагины и задачи, подверженные багу:** любые, что используют `TLocalTimersQueue` (отложенные таймеры через `DelayedExecutor`). В коде UA это, в частности:
- **http_output** (`plugins/lib/http/http_sender.cpp`) — таймеры для отложенных повторов запросов и flush;
- **file_input** (`plugins/file_input/file_input.cpp`) — таймеры при работе с файлами;
- **logbroker_output_new** (`plugins/logbroker_output_new/logbroker_output_impl.h`) — таймер обновления метрик.
Во всех этих случаях задача владеет `TLocalTimersQueue`, при установке таймера в finish-действие попадает `CommitTimer()`, и при быстром Unregister() возможна гонка с Join().
---
{% cut "Зачем потоку воркера вызывать Ref()" %}
Таймер планируется **асинхронно**: `DelayedExecutor.SetTimer(Timer, triggerTime)` регистрирует callback в отдельном потоке (sleeper). Когда время наступит, callback вызовется **уже после** того, как `Run()` задачи завершился. Задача и её контекст (сессия, `TLocalTimersQueue`, `ExecutionJoiner`) должны оставаться валидными до срабатывания или отмены таймера — иначе callback приведёт к use-after-free.
**Ref()** — это «удержание» задачи: пока есть лишний Ref, `Join()` не завершится (Refs не станет 0). То есть: «задача не считается полностью завершённой, пока таймер не сработал или не был сброшен». Когда таймер сбрасывают или очередь финализируют, вызывается **UnRef()** — тогда задача может перейти в joined.
**Где именно вызывается Ref/UnRef:**
Файл `logbroker/unified_agent/common/delayed_executor.cpp`:
```cpp
void TLocalTimersQueue::CommitTimer() {
if (Queue.GetCount() > 0) {
const auto triggerTime = Top().Value();
if (!TimerTriggerTime.Defined() || ...) {
if (!TimerTriggerTime.Defined()) {
// ← ЗДЕСЬ: перед первой установкой таймера в DelayedExecutor
// держим задачу «живой» до срабатывания/сброса таймера
if (!TTaskExecutor::CurrentTaskOrDie().ExecutionJoiner().TryRef()) {
CommitTimerScheduled = false;
return;
}
}
DelayedExecutor.SetTimer(Timer, triggerTime); // асинхронный таймер
TimerTriggerTime = triggerTime;
}
} else if (TimerTriggerTime.Defined()) {
DelayedExecutor.ResetTimer(Timer);
TTaskExecutor::CurrentTaskOrDie().ExecutionJoiner().UnRef(); // таймер снят — отдаём Ref
TimerTriggerTime.Clear();
}
...
}
```
`CommitTimer()` вызывается из **finish-действия** задачи (добавляется в `EnsureCommitTimerScheduled()` → `AddFinishAction([this]() { CommitTimer(); })`), т.е. выполняется в потоке воркера после выхода из `Run()`. Пример использования таймера из кода плагина — `plugins/lib/http/http_sender.cpp`: там `LocalTimerQueue.SetTimer(request->Timer, triggerTime)` планирует отложенный повтор запроса; callback при срабатывании постит событие в сессию.
{% endcut %}
---
{% cut "Контекст: участники и суть гонки" %}
**Участники:**
- **ExecutionJoiner** (`TAsyncJoiner`) — объект с атомарным счётчиком `Refs` (начальное значение 1). Пока `Refs >= 1`, задачу считают «активной». `Join()` вызывает `UnRef()`; когда `Refs` становится 0, вызывается `Promise.SetValue()` («joined»).
- **Поток задачи (Task/Unregister)** — владелец задачи; вызывает `Unregister()` → `ExecutionJoiner_.Join()` → внутри один раз `UnRef()`.
- **Поток таймера (Worker/Timer)** — воркер пула задач; выполняет finish-действия задачи. Одно из них — `CommitTimer()`, которое при первой установке таймера вызывало `ExecutionJoiner().Ref()`.
**Гонка:** между моментом, когда поток задачи делает `Join()` (и доводит `Refs` до 0), и моментом, когда поток воркера выполняет `CommitTimer()` и вызывает `Ref()`. Если `Ref()` вызывается уже после перехода в «joined», `fetch_add(1)` возвращает 0 и срабатывает `Y_ABORT_UNLESS(result >= 1, "already joined")`.
{% endcut %}
---
{% cut "До фикса: креш при гонке" %}
Поток задачи вызывает `Unregister()` и ждёт `Join()`. Поток воркера после завершения `Run()` выполняет finish-действие `CommitTimer()`. Если к этому моменту `Join()` уже выполнил `UnRef()` и `Refs == 0`, вызов `Ref()` в `CommitTimer()` приводит к падению.
```mermaid
sequenceDiagram
participant TaskThread as Поток задачи
participant Joiner as ExecutionJoiner
participant WorkerThread as Поток воркера
Note over TaskThread,WorkerThread: Задача с таймером: Run() вызвал SetTimer(), в finish-действия добавлен CommitTimer()
TaskThread->>TaskThread: Unregister()
TaskThread->>Joiner: Join()
Joiner->>Joiner: UnRef() → Refs = 0
Joiner->>Joiner: Promise.SetValue() — joined
WorkerThread->>WorkerThread: Выполняет finish-действия
WorkerThread->>WorkerThread: CommitTimer()
WorkerThread->>Joiner: Ref()
Joiner->>Joiner: fetch_add(1) → result = 0
Joiner->>WorkerThread: Y_ABORT_UNLESS(result >= 1) — CRASH
```
**Итог:** в момент вызова `Ref()` в `CommitTimer()` объект уже в состоянии «joined» (`Refs == 0`), проверка в `Ref()` не выполняется → **SIGABRT**.
{% endcut %}
---
{% cut "После фикса: корректный выход без креша" %}
В `CommitTimer()` вместо `Ref()` вызывается `TryRef()`: атомарно проверяется `Refs >= 1` (через CAS); если уже 0, `TryRef()` возвращает `false` и `CommitTimer()` сразу выходит, не вызывая `Ref()` и не трогая таймер.
```mermaid
sequenceDiagram
participant TaskThread as Поток задачи
participant Joiner as ExecutionJoiner
participant WorkerThread as Поток воркера
Note over TaskThread,WorkerThread: Та же гонка: Join() и CommitTimer() выполняются почти одновременно
TaskThread->>TaskThread: Unregister()
TaskThread->>Joiner: Join()
Joiner->>Joiner: UnRef() → Refs = 0
Joiner->>Joiner: Promise.SetValue() — joined
WorkerThread->>WorkerThread: Выполняет finish-действия
WorkerThread->>WorkerThread: CommitTimer()
WorkerThread->>Joiner: TryRef()
Joiner->>Joiner: load(Refs) = 0 → current < 1
Joiner->>WorkerThread: return false
WorkerThread->>WorkerThread: CommitTimerScheduled = false return
Note over WorkerThread: Таймер не ставится, креша нет
```
**Итог:** при уже «joined» состоянии `TryRef()` возвращает `false`, `CommitTimer()` завершается без вызова `Ref()` и без падения.
{% endcut %}
---
{% cut "Сводка изменений" %}
| Место | До фикса | После фикса |
|-------|----------|-------------|
| `CommitTimer()` при первой установке таймера | `Ref()` → при Refs=0 креш | `TryRef()` → при `false` ранний выход |
| `TAsyncJoiner` | Только `Ref()` / `UnRef()` | Добавлен `TryRef()` (CAS, при refs<1 возврат false) |
| `Finalize()` | Не сбрасывал активный таймер | При `TimerTriggerTime.Defined()` — `ResetTimer`, `UnRef()`, `Clear()` до `Finalized = true` |
Тест `TestTimerQueueUnregisterNoCrash` (500 итераций: задача с таймером → Pulse → Unregister) без фикса периодически воспроизводит креш; с фиксом — стабильно зелёный.
{% endcut %}
commit_hash:5f57d88fc53f44db31e87deaeca57a7e9ef262ca
|
| |
|
|
| |
commit_hash:d1b4740ab0854d5cd2baaff61c17709d97f398f0
|
| |
|
|
| |
commit_hash:7b0bb805a82d9829ea93f5b867962e77a2c56244
|
| |
|
|
| |
commit_hash:6d5cf8bea86a35efd558df2aaec98702dc514f1c
|
| |
|
|
| |
commit_hash:0f03724e68eb4d3d411946e1c7b201d1b79f951a
|
| |
|
|
| |
commit_hash:f68a98021cf1e69e6ac402deb24dc2eef75162f5
|
| |
|
|
| |
commit_hash:7d0102e6cd782673cf13aa1236a16c60bd9dfc3a
|
| |
|
|
| |
commit_hash:4744aaa8f540cfc9351f1aacb9e1431d115aed85
|