diff options
| author | andybg <[email protected]> | 2026-03-14 04:44:54 +0300 |
|---|---|---|
| committer | andybg <[email protected]> | 2026-03-14 05:16:45 +0300 |
| commit | f462fa40c277092fe8810533f85712c7ccb02215 (patch) | |
| tree | 3d19eb7be2c9a9785c485b31bf716675a0f7e7a0 /library/cpp/yt/threading/execution_stack.cpp | |
| parent | ccadac8e834b968df67663e235855ed61701d4d7 (diff) | |
Fix TAsyncJoiner "already joined" crash
## 1. Где и когда это может происходить
Гонка проявляется при **одновременном** завершении задачи (Unregister/Join) и выполнении finish-действия `CommitTimer()` в воркере. Типичные сценарии:
- **Остановка агента** — при shutdown закрываются сессии, по задачам вызывается Unregister(); в это же время воркер может доходить до finish-действий уже «уходящей» задачи.
- **Закрытие сессии/канала** — например, отключение клиента, ошибка — владелец вызывает Unregister() по задаче, в которой использовался таймер.
- **Плагины и задачи, подверженные багу:** любые, что используют `TLocalTimersQueue` (отложенные таймеры через `DelayedExecutor`). В коде UA это, в частности:
- **http_output** (`plugins/lib/http/http_sender.cpp`) — таймеры для отложенных повторов запросов и flush;
- **file_input** (`plugins/file_input/file_input.cpp`) — таймеры при работе с файлами;
- **logbroker_output_new** (`plugins/logbroker_output_new/logbroker_output_impl.h`) — таймер обновления метрик.
Во всех этих случаях задача владеет `TLocalTimersQueue`, при установке таймера в finish-действие попадает `CommitTimer()`, и при быстром Unregister() возможна гонка с Join().
---
{% cut "Зачем потоку воркера вызывать Ref()" %}
Таймер планируется **асинхронно**: `DelayedExecutor.SetTimer(Timer, triggerTime)` регистрирует callback в отдельном потоке (sleeper). Когда время наступит, callback вызовется **уже после** того, как `Run()` задачи завершился. Задача и её контекст (сессия, `TLocalTimersQueue`, `ExecutionJoiner`) должны оставаться валидными до срабатывания или отмены таймера — иначе callback приведёт к use-after-free.
**Ref()** — это «удержание» задачи: пока есть лишний Ref, `Join()` не завершится (Refs не станет 0). То есть: «задача не считается полностью завершённой, пока таймер не сработал или не был сброшен». Когда таймер сбрасывают или очередь финализируют, вызывается **UnRef()** — тогда задача может перейти в joined.
**Где именно вызывается Ref/UnRef:**
Файл `logbroker/unified_agent/common/delayed_executor.cpp`:
```cpp
void TLocalTimersQueue::CommitTimer() {
if (Queue.GetCount() > 0) {
const auto triggerTime = Top().Value();
if (!TimerTriggerTime.Defined() || ...) {
if (!TimerTriggerTime.Defined()) {
// ← ЗДЕСЬ: перед первой установкой таймера в DelayedExecutor
// держим задачу «живой» до срабатывания/сброса таймера
if (!TTaskExecutor::CurrentTaskOrDie().ExecutionJoiner().TryRef()) {
CommitTimerScheduled = false;
return;
}
}
DelayedExecutor.SetTimer(Timer, triggerTime); // асинхронный таймер
TimerTriggerTime = triggerTime;
}
} else if (TimerTriggerTime.Defined()) {
DelayedExecutor.ResetTimer(Timer);
TTaskExecutor::CurrentTaskOrDie().ExecutionJoiner().UnRef(); // таймер снят — отдаём Ref
TimerTriggerTime.Clear();
}
...
}
```
`CommitTimer()` вызывается из **finish-действия** задачи (добавляется в `EnsureCommitTimerScheduled()` → `AddFinishAction([this]() { CommitTimer(); })`), т.е. выполняется в потоке воркера после выхода из `Run()`. Пример использования таймера из кода плагина — `plugins/lib/http/http_sender.cpp`: там `LocalTimerQueue.SetTimer(request->Timer, triggerTime)` планирует отложенный повтор запроса; callback при срабатывании постит событие в сессию.
{% endcut %}
---
{% cut "Контекст: участники и суть гонки" %}
**Участники:**
- **ExecutionJoiner** (`TAsyncJoiner`) — объект с атомарным счётчиком `Refs` (начальное значение 1). Пока `Refs >= 1`, задачу считают «активной». `Join()` вызывает `UnRef()`; когда `Refs` становится 0, вызывается `Promise.SetValue()` («joined»).
- **Поток задачи (Task/Unregister)** — владелец задачи; вызывает `Unregister()` → `ExecutionJoiner_.Join()` → внутри один раз `UnRef()`.
- **Поток таймера (Worker/Timer)** — воркер пула задач; выполняет finish-действия задачи. Одно из них — `CommitTimer()`, которое при первой установке таймера вызывало `ExecutionJoiner().Ref()`.
**Гонка:** между моментом, когда поток задачи делает `Join()` (и доводит `Refs` до 0), и моментом, когда поток воркера выполняет `CommitTimer()` и вызывает `Ref()`. Если `Ref()` вызывается уже после перехода в «joined», `fetch_add(1)` возвращает 0 и срабатывает `Y_ABORT_UNLESS(result >= 1, "already joined")`.
{% endcut %}
---
{% cut "До фикса: креш при гонке" %}
Поток задачи вызывает `Unregister()` и ждёт `Join()`. Поток воркера после завершения `Run()` выполняет finish-действие `CommitTimer()`. Если к этому моменту `Join()` уже выполнил `UnRef()` и `Refs == 0`, вызов `Ref()` в `CommitTimer()` приводит к падению.
```mermaid
sequenceDiagram
participant TaskThread as Поток задачи
participant Joiner as ExecutionJoiner
participant WorkerThread as Поток воркера
Note over TaskThread,WorkerThread: Задача с таймером: Run() вызвал SetTimer(), в finish-действия добавлен CommitTimer()
TaskThread->>TaskThread: Unregister()
TaskThread->>Joiner: Join()
Joiner->>Joiner: UnRef() → Refs = 0
Joiner->>Joiner: Promise.SetValue() — joined
WorkerThread->>WorkerThread: Выполняет finish-действия
WorkerThread->>WorkerThread: CommitTimer()
WorkerThread->>Joiner: Ref()
Joiner->>Joiner: fetch_add(1) → result = 0
Joiner->>WorkerThread: Y_ABORT_UNLESS(result >= 1) — CRASH
```
**Итог:** в момент вызова `Ref()` в `CommitTimer()` объект уже в состоянии «joined» (`Refs == 0`), проверка в `Ref()` не выполняется → **SIGABRT**.
{% endcut %}
---
{% cut "После фикса: корректный выход без креша" %}
В `CommitTimer()` вместо `Ref()` вызывается `TryRef()`: атомарно проверяется `Refs >= 1` (через CAS); если уже 0, `TryRef()` возвращает `false` и `CommitTimer()` сразу выходит, не вызывая `Ref()` и не трогая таймер.
```mermaid
sequenceDiagram
participant TaskThread as Поток задачи
participant Joiner as ExecutionJoiner
participant WorkerThread as Поток воркера
Note over TaskThread,WorkerThread: Та же гонка: Join() и CommitTimer() выполняются почти одновременно
TaskThread->>TaskThread: Unregister()
TaskThread->>Joiner: Join()
Joiner->>Joiner: UnRef() → Refs = 0
Joiner->>Joiner: Promise.SetValue() — joined
WorkerThread->>WorkerThread: Выполняет finish-действия
WorkerThread->>WorkerThread: CommitTimer()
WorkerThread->>Joiner: TryRef()
Joiner->>Joiner: load(Refs) = 0 → current < 1
Joiner->>WorkerThread: return false
WorkerThread->>WorkerThread: CommitTimerScheduled = false return
Note over WorkerThread: Таймер не ставится, креша нет
```
**Итог:** при уже «joined» состоянии `TryRef()` возвращает `false`, `CommitTimer()` завершается без вызова `Ref()` и без падения.
{% endcut %}
---
{% cut "Сводка изменений" %}
| Место | До фикса | После фикса |
|-------|----------|-------------|
| `CommitTimer()` при первой установке таймера | `Ref()` → при Refs=0 креш | `TryRef()` → при `false` ранний выход |
| `TAsyncJoiner` | Только `Ref()` / `UnRef()` | Добавлен `TryRef()` (CAS, при refs<1 возврат false) |
| `Finalize()` | Не сбрасывал активный таймер | При `TimerTriggerTime.Defined()` — `ResetTimer`, `UnRef()`, `Clear()` до `Finalized = true` |
Тест `TestTimerQueueUnregisterNoCrash` (500 итераций: задача с таймером → Pulse → Unregister) без фикса периодически воспроизводит креш; с фиксом — стабильно зелёный.
{% endcut %}
commit_hash:5f57d88fc53f44db31e87deaeca57a7e9ef262ca
Diffstat (limited to 'library/cpp/yt/threading/execution_stack.cpp')
0 files changed, 0 insertions, 0 deletions
