diff options
author | ponasenko-rs <ponasenko-rs@yandex-team.com> | 2024-11-21 16:59:36 +0300 |
---|---|---|
committer | ponasenko-rs <ponasenko-rs@yandex-team.com> | 2024-11-21 17:15:13 +0300 |
commit | cafe0d9f1f5555e43c16cbb1c73375fee224eca6 (patch) | |
tree | 36bd0e512511f644e3b3a0d7b861ca4658b2ae71 /yt | |
parent | 7d6de1167d164a6966d836f636d4027ced58c06a (diff) | |
download | ydb-cafe0d9f1f5555e43c16cbb1c73375fee224eca6.tar.gz |
YT-23585: Fix race between Cancel and GetCanceledError in Canceler
commit_hash:2c1b31d95037975cf4703cb90f81232f880a37e6
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/core/concurrency/fiber_scheduler_thread.cpp | 25 |
1 files changed, 14 insertions, 11 deletions
diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp index 441d093a4c..72130a6717 100644 --- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp @@ -621,7 +621,17 @@ public: void SetFuture(TFuture<void> awaitable) { auto guard = Guard(Lock_); - Future_ = std::move(awaitable); + if (!IsCanceled()) { + Future_ = std::move(awaitable); + return; + } + + guard.Release(); + + ErrorSet_.Wait(); + + YT_ASSERT(!CancelationError_.IsOK()); + awaitable.Cancel(CancelationError_); } void ResetFuture() @@ -644,6 +654,8 @@ public: future = std::move(Future_); } + ErrorSet_.NotifyAll(); + if (future) { YT_LOG_DEBUG("Sending cancelation to fiber, propagating to the awaited future (TargetFiberId: %x)", FiberId_); @@ -654,12 +666,6 @@ public: } } - TError GetCancelationError() const - { - auto guard = Guard(Lock_); - return CancelationError_; - } - void Run(const TError& error) { Cancel(error); @@ -680,6 +686,7 @@ private: const TFiberId FiberId_; std::atomic<bool> Canceled_ = false; + NThreading::TEvent ErrorSet_; NThreading::TSpinLock Lock_; TError CancelationError_; TFuture<void> Future_; @@ -1173,10 +1180,6 @@ void WaitUntilSet(TFuture<void> future, IInvokerPtr invoker) GetCurrentFiberCanceler(); const auto& canceler = NDetail::GetFiberSwitchHandler()->Canceler(); - if (canceler->IsCanceled()) { - future.Cancel(canceler->GetCancelationError()); - } - canceler->SetFuture(future); auto finally = Finally([&] { canceler->ResetFuture(); |