aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpechatnov <pechatnov@yandex-team.com>2025-02-13 15:01:37 +0300
committerpechatnov <pechatnov@yandex-team.com>2025-02-13 15:28:49 +0300
commita8b10ce3de933ec55e006347e27da8f08e9cc598 (patch)
treecb80754d8b26803051431a3754b73aa02601d4cb
parentdb0b8a1c047b7a7770e1649d0fa3551456bc96c2 (diff)
downloadydb-a8b10ce3de933ec55e006347e27da8f08e9cc598.tar.gz
YT: Fix race on vptr in TServiceBase
commit_hash:d96419b9ca6a790844a064b3da7710e45964159f
-rw-r--r--yt/yt/core/rpc/service_detail.cpp27
-rw-r--r--yt/yt/core/rpc/service_detail.h4
2 files changed, 26 insertions, 5 deletions
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index f2dec276cb..065e417c1c 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -1654,8 +1654,6 @@ TServiceBase::TServiceBase(
Profiler_.AddFuncGauge("/authentication_queue_size", MakeStrong(this), [this] {
return AuthenticationQueueSize_.load(std::memory_order::relaxed);
});
-
- ServiceLivenessChecker_->Start();
}
const TServiceId& TServiceBase::GetServiceId() const
@@ -2431,6 +2429,25 @@ void TServiceBase::DecrementActiveRequestCount()
void TServiceBase::InitContext(IServiceContext* /*context*/)
{ }
+void TServiceBase::StartServiceLivenessChecker()
+{
+ // Fast path.
+ if (ServiceLivenessCheckerStarted_.load(std::memory_order::relaxed)) {
+ return;
+ }
+ if (ServiceLivenessCheckerStarted_.exchange(true)) {
+ return;
+ }
+
+ if (auto checker = ServiceLivenessChecker_.Acquire()) {
+ checker->Start();
+ // There may be concurrent ServiceLivenessChecker_.Exchange() call in Stop().
+ if (!ServiceLivenessChecker_.Acquire()) {
+ YT_UNUSED_FUTURE(checker->Stop());
+ }
+ }
+}
+
void TServiceBase::RegisterDiscoverRequest(const TCtxDiscoverPtr& context)
{
auto payload = GetDiscoverRequestPayload(context);
@@ -2440,6 +2457,7 @@ void TServiceBase::RegisterDiscoverRequest(const TCtxDiscoverPtr& context)
auto it = DiscoverRequestsByPayload_.find(payload);
if (it == DiscoverRequestsByPayload_.end()) {
readerGuard.Release();
+ StartServiceLivenessChecker();
auto writerGuard = WriterGuard(DiscoverRequestsByPayloadLock_);
DiscoverRequestsByPayload_[payload].Insert(context, 0);
} else {
@@ -2706,8 +2724,9 @@ TFuture<void> TServiceBase::Stop()
}
}
- YT_UNUSED_FUTURE(ServiceLivenessChecker_->Stop());
-
+ if (auto checker = ServiceLivenessChecker_.Exchange(nullptr)) {
+ YT_UNUSED_FUTURE(checker->Stop());
+ }
return StopResult_.ToFuture();
}
diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h
index 531a03018d..e19338343a 100644
--- a/yt/yt/core/rpc/service_detail.h
+++ b/yt/yt/core/rpc/service_detail.h
@@ -987,7 +987,8 @@ private:
std::atomic<bool> EnableErrorCodeCounter_ = false;
- const NConcurrency::TPeriodicExecutorPtr ServiceLivenessChecker_;
+ std::atomic<bool> ServiceLivenessCheckerStarted_ = false;
+ TAtomicIntrusivePtr<NConcurrency::TPeriodicExecutor> ServiceLivenessChecker_;
using TDiscoverRequestSet = TConcurrentHashMap<TCtxDiscoverPtr, int>;
THashMap<TString, TDiscoverRequestSet> DiscoverRequestsByPayload_;
@@ -1075,6 +1076,7 @@ private:
void IncrementActiveRequestCount();
void DecrementActiveRequestCount();
+ void StartServiceLivenessChecker();
void RegisterDiscoverRequest(const TCtxDiscoverPtr& context);
void ReplyDiscoverRequest(const TCtxDiscoverPtr& context, bool isUp);