diff options
author | pechatnov <pechatnov@yandex-team.com> | 2025-02-13 15:01:37 +0300 |
---|---|---|
committer | pechatnov <pechatnov@yandex-team.com> | 2025-02-13 15:28:49 +0300 |
commit | a8b10ce3de933ec55e006347e27da8f08e9cc598 (patch) | |
tree | cb80754d8b26803051431a3754b73aa02601d4cb | |
parent | db0b8a1c047b7a7770e1649d0fa3551456bc96c2 (diff) | |
download | ydb-a8b10ce3de933ec55e006347e27da8f08e9cc598.tar.gz |
YT: Fix race on vptr in TServiceBase
commit_hash:d96419b9ca6a790844a064b3da7710e45964159f
-rw-r--r-- | yt/yt/core/rpc/service_detail.cpp | 27 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.h | 4 |
2 files changed, 26 insertions, 5 deletions
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index f2dec276cb..065e417c1c 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -1654,8 +1654,6 @@ TServiceBase::TServiceBase( Profiler_.AddFuncGauge("/authentication_queue_size", MakeStrong(this), [this] { return AuthenticationQueueSize_.load(std::memory_order::relaxed); }); - - ServiceLivenessChecker_->Start(); } const TServiceId& TServiceBase::GetServiceId() const @@ -2431,6 +2429,25 @@ void TServiceBase::DecrementActiveRequestCount() void TServiceBase::InitContext(IServiceContext* /*context*/) { } +void TServiceBase::StartServiceLivenessChecker() +{ + // Fast path. + if (ServiceLivenessCheckerStarted_.load(std::memory_order::relaxed)) { + return; + } + if (ServiceLivenessCheckerStarted_.exchange(true)) { + return; + } + + if (auto checker = ServiceLivenessChecker_.Acquire()) { + checker->Start(); + // There may be concurrent ServiceLivenessChecker_.Exchange() call in Stop(). + if (!ServiceLivenessChecker_.Acquire()) { + YT_UNUSED_FUTURE(checker->Stop()); + } + } +} + void TServiceBase::RegisterDiscoverRequest(const TCtxDiscoverPtr& context) { auto payload = GetDiscoverRequestPayload(context); @@ -2440,6 +2457,7 @@ void TServiceBase::RegisterDiscoverRequest(const TCtxDiscoverPtr& context) auto it = DiscoverRequestsByPayload_.find(payload); if (it == DiscoverRequestsByPayload_.end()) { readerGuard.Release(); + StartServiceLivenessChecker(); auto writerGuard = WriterGuard(DiscoverRequestsByPayloadLock_); DiscoverRequestsByPayload_[payload].Insert(context, 0); } else { @@ -2706,8 +2724,9 @@ TFuture<void> TServiceBase::Stop() } } - YT_UNUSED_FUTURE(ServiceLivenessChecker_->Stop()); - + if (auto checker = ServiceLivenessChecker_.Exchange(nullptr)) { + YT_UNUSED_FUTURE(checker->Stop()); + } return StopResult_.ToFuture(); } diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h index 531a03018d..e19338343a 100644 --- a/yt/yt/core/rpc/service_detail.h +++ b/yt/yt/core/rpc/service_detail.h @@ -987,7 +987,8 @@ private: std::atomic<bool> EnableErrorCodeCounter_ = false; - const NConcurrency::TPeriodicExecutorPtr ServiceLivenessChecker_; + std::atomic<bool> ServiceLivenessCheckerStarted_ = false; + TAtomicIntrusivePtr<NConcurrency::TPeriodicExecutor> ServiceLivenessChecker_; using TDiscoverRequestSet = TConcurrentHashMap<TCtxDiscoverPtr, int>; THashMap<TString, TDiscoverRequestSet> DiscoverRequestsByPayload_; @@ -1075,6 +1076,7 @@ private: void IncrementActiveRequestCount(); void DecrementActiveRequestCount(); + void StartServiceLivenessChecker(); void RegisterDiscoverRequest(const TCtxDiscoverPtr& context); void ReplyDiscoverRequest(const TCtxDiscoverPtr& context, bool isUp); |