diff options
| author | andybg <[email protected]> | 2026-06-18 16:15:00 +0300 |
|---|---|---|
| committer | andybg <[email protected]> | 2026-06-18 17:18:49 +0300 |
| commit | 7873460179706278f09ef7c45150039a5d9433f5 (patch) | |
| tree | a6d8af6b6b0ad66f0acf0d795a6ed44e1f831b01 /contrib/tools/python3/src/Python/frozen_modules/getpath.h | |
| parent | 78ccc4ef49a6bc5ae2a8ede546547ef65286a8b3 (diff) | |
Serialize TGrpcTimer on completion queue thread
## 1. В чём была проблема
У сессии gRPC-клиента (`TClientSession`) несколько таймеров на базе **`TGrpcTimer`** используют один и тот же механизм: внутри лежит **`grpc::Alarm`**, привязанный к **одному** `CompletionQueue`, события с которого обрабатывает **отдельный поток** poller (в логах и стеках — `ua_grpc_cq`).
**Публичные методы** `TGrpcTimer::Set` и `Cancel` вызывались **и с этого потока** (после срабатывания alarm, из колбэков завершения gRPC-операций, из `Poll` и т.д.), **и с других потоков** — например, при старте сессии из потока пула (`DoStart` → первый `MakeGrpcCallTimer->Set`). В результате **один и тот же** `grpc::Alarm` и связанные с ним поля таймера менялись **конкурентно без синхронизации**. ThreadSanitizer фиксировал **data race** внутри реализации alarm в gRPC; с точки зрения контракта это **недопустимое параллельное использование** обёртки над alarm.
Типичный конфликт: на потоке приложения выполняется **`Set`** (первый запуск таймера переподключения), параллельно на **`ua_grpc_cq`** обрабатывается завершение вызова и снова вызывается **`Set`** для отложенного переподключения — оба попадают в **`Alarm::Set`** для одного объекта.
---
## 2. Изменение архитектуры (как починили)
Инвариант: **любое изменение состояния `TGrpcTimer`, которое трогает `grpc::Alarm` и служебные поля таймера, выполняется только на потоке, который крутит `CompletionQueue::Next` для этого клиента** (тот же поток, что обрабатывает срабатывания alarm).
Для этого:
- Введены внутренние **`ApplySet` / `ApplyCancel`** — в них перенесена прежняя логика работы с alarm; вызывать их разрешено **только** в контексте poller.
- Публичные **`Set` / `Cancel`**: если вызов уже идёт **из** poller (определяется **thread-local** флагом на время обработки события из CQ), сразу вызываются **`Apply*`**; иначе работа **ставится в ту же** `CompletionQueue` через **искусственное завершение** (`grpc_cq_begin_op` / `grpc_cq_end_op`), а колбэк на стороне poller выполняет **`Apply*`**.
- Чтобы отложенная операция не обращалась к уже уничтоженной сессии, перед постановкой в очередь делается **`TryRef`** на **`TAsyncJoiner`** сессии; после выполнения **`Apply*`** — **`UnRef`**. Если сессия уже уходит в закрытие и joiner недоступен, отложенный **`Set`/`Cancel`** тихо отбрасывается.
С точки зрения **`client_impl`**, вызовы **`MakeGrpcCallTimer->Set`**, **`ForceCloseTimer->Set`**, **`PollTimer->Set`**, **`->Cancel`** **не менялись** — меняется только реализация внутри **`TGrpcTimer`** и конструктор (передаётся ссылка на **`AsyncJoiner`** сессии).
---
## 3. Новая архитектура: sequence diagram и пример
{% cut "Таблица потоков, примеры A/B и диаграммы Mermaid" %}
Ниже — **два типичных сценария** для одного таймера, например **`MakeGrpcCallTimer`** (переподключение после завершения gRPC-вызова).
### Где именно теперь «живёт» работа с таймером
| Действие | Поток |
| :--- | :--- |
| Публичный **`Set` / `Cancel`** с **стороны приложения** (не poller) | Постановка задачи в **CQ**; реальное **`Apply*`** — на **`ua_grpc_cq`**. |
| Публичный **`Set` / `Cancel`** уже **внутри** обработчика события CQ (вложенный вызов) | Сразу **`Apply*`** на том же потоке (**без** повторной постановки). |
| Срабатывание **`grpc::Alarm`** | Доставка в poller → **`TGrpcTimer::OnIOCompleted`** → при необходимости снова **`Alarm.Set`** / вызов пользовательского колбэка — **всё на `ua_grpc_cq`**. |
### Пример A: первый `Set` при старте сессии (поток приложения)
Сессия стартует в **`DoStart`** на **рабочем** потоке; **`MakeGrpcCallTimer->Set(Now())`** не трогает alarm напрямую — **ставит** в CQ задачу «выполнить **`ApplySet`**»; poller **выполняет** её и выставляет alarm.
```mermaid
sequenceDiagram
participant App as AppThread
participant Timer as TGrpcTimer
participant CQ as CompletionQueue
participant Poller as ua_grpc_cq
App->>Timer: Set(now)
Timer->>Timer: not poller thread
Timer->>Timer: TryRef(AsyncJoiner)
Timer->>CQ: enqueue synthetic op
CQ-->>Poller: Next delivers op
Poller->>Timer: deferred callback
Timer->>Timer: ApplySet(now)
Timer->>Timer: Alarm.Set + schedule
Timer->>Timer: UnRef(AsyncJoiner)
```
### Пример B: перепланирование после завершения вызова (уже на poller)
**`OnGrpcCallFinished`** вызывается с **CQ** после обработки тега gRPC; **`MakeGrpcCallTimer->Set(reconnectTime)`** попадает в **fast path** и сразу вызывает **`ApplySet`** на **`ua_grpc_cq`** — без очереди.
```mermaid
sequenceDiagram
participant Poller as ua_grpc_cq
participant Session as TClientSession
participant Timer as TGrpcTimer
Poller->>Session: OnGrpcCallFinished
Session->>Timer: Set(reconnectTime)
Timer->>Timer: poller thread (TLS)
Timer->>Timer: ApplySet(reconnectTime)
```
### Срабатывание alarm (напоминание)
Когда срабатывает **внутренний** alarm gRPC, poller получает тег **`TGrpcTimer`**, вызывает **`OnIOCompleted`**: там снова возможны **`Alarm.Set`** (перенос по **`NextTriggerTime`**) или переход к **пользовательскому** колбэку (**`MakeGrpcCall`**, **`Poll`**, **`BeginClose`** и т.д.) — **всё на том же потоке `ua_grpc_cq`**.
{% endcut %}
---
{% cut "Технические детали (файлы, API gRPC, TSAN, lifetime)" %}
### Файлы в Arcadia
| Файл | Роль |
| :--- | :--- |
| `library/cpp/unified_agent_client/grpc_io.h`, `grpc_io.cpp` | `TGrpcTimer`, `TPostedCompletion` / `TPostedBridge`, `PostIIOCallbackToCompletionQueue`, `TlsInUaGrpcCompletionQueuePoller` в цикле poller. |
| `library/cpp/unified_agent_client/client_impl.cpp` | Создание трёх `TGrpcTimer` с передачей `AsyncJoiner` сессии. |
### Низкоуровневая постановка в CQ
Паттерн тот же, что у **`TGrpcNotification::Trigger`**: **`grpc_core::ApplicationCallbackExecCtx`**, **`grpc_core::ExecCtx`**, **`grpc_cq_begin_op`**, **`grpc_cq_end_op`**. Тег **`CompletionQueueTag`** — отдельный объект; в **`FinalizeResult`** в poller передаётся **`IIOCallback`** (мост), который выполняет отложенную лямбду и освобождает себя после **`OnIOCompleted`**.
### TSAN ()
В отчёте TSAN конкурирующие записи шли в **`grpc::internal::AlarmImpl::Set`** из потока **`ua_grpc_cq`** (цепочка **`OnGrpcCallFinished` → `MakeGrpcCallTimer->Set`**) и из потока приложения (**`DoStart` → `MakeGrpcCallTimer->Set(Now())`**). После фикса оба пути сериализуют **`ApplySet`** на poller.
### Поведение при закрытии сессии
Если **`TryRef(AsyncJoiner)`** не удался, отложенный **`Set`/`Cancel`** не ставится — сессия уже в фазе **`Join`**. Уже стоящие в CQ задачи удерживают ref до выполнения **`Apply*`** и **`UnRef`**.
### Заглушка счётчика для `MakeIOCallback`
Для отложенной лямбды используется **`TNoOpRefStub`**: **`Ref`/`UnRef`** пустые; удержание сессии обеспечивается парой **`TryRef`/`UnRef`** на **`TAsyncJoiner`**, а не счётчиком **`TIOCallback`**.
{% endcut %}
commit_hash:ba05e9c98e41bcf748270a48a818cc7d233a161b
Diffstat (limited to 'contrib/tools/python3/src/Python/frozen_modules/getpath.h')
0 files changed, 0 insertions, 0 deletions
